feat: structured logging followup (#248)

This commit is contained in:
Martin Kobetic 2022-05-30 11:55:30 -04:00 committed by GitHub
parent 56c8ef705a
commit 7c44369def
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 245 additions and 227 deletions

View File

@ -8,9 +8,11 @@
package logging package logging
import ( import (
"net"
"time" "time"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
@ -110,3 +112,18 @@ func HexBytes(key string, bytes []byte) zap.Field {
func (bytes hexBytes) String() string { func (bytes hexBytes) String() string {
return hexutil.Encode(bytes) return hexutil.Encode(bytes)
} }
// ENode creates a field for ENR node.
func ENode(key string, node *enode.Node) zap.Field {
return zap.Stringer(key, node)
}
// TCPAddr creates a field for TCP v4/v6 address and port
func TCPAddr(key string, ip net.IP, port int) zap.Field {
return zap.Stringer(key, &net.TCPAddr{IP: ip, Port: port})
}
// UDPAddr creates a field for UDP v4/v6 address and port
func UDPAddr(key string, ip net.IP, port int) zap.Field {
return zap.Stringer(key, &net.UDPAddr{IP: ip, Port: port})
}

View File

@ -9,25 +9,15 @@ import (
"net" "net"
"testing" "testing"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"go.uber.org/zap"
) )
var log *zap.SugaredLogger = nil
func Logger() *zap.SugaredLogger {
if log == nil {
l, _ := zap.NewDevelopment()
log = l.Sugar()
}
return log
}
// GetHostAddress returns the first listen address used by a host // GetHostAddress returns the first listen address used by a host
func GetHostAddress(ha host.Host) ma.Multiaddr { func GetHostAddress(ha host.Host) ma.Multiaddr {
return ha.Addrs()[0] return ha.Addrs()[0]

View File

@ -16,11 +16,11 @@ import (
// Server runs and controls a HTTP pprof interface. // Server runs and controls a HTTP pprof interface.
type Server struct { type Server struct {
server *http.Server server *http.Server
log *zap.SugaredLogger log *zap.Logger
} }
// NewMetricsServer creates a prometheus server on a particular interface and port // NewMetricsServer creates a prometheus server on a particular interface and port
func NewMetricsServer(address string, port int, log *zap.SugaredLogger) *Server { func NewMetricsServer(address string, port int, log *zap.Logger) *Server {
p := Server{ p := Server{
log: log.Named("metrics"), log: log.Named("metrics"),
} }
@ -32,12 +32,12 @@ func NewMetricsServer(address string, port int, log *zap.SugaredLogger) *Server
pe, err := prometheus.NewExporter(prometheus.Options{}) pe, err := prometheus.NewExporter(prometheus.Options{})
if err != nil { if err != nil {
p.log.Fatalf("Failed to create the Prometheus stats exporter: %v", err) p.log.Fatal("creating Prometheus stats exporter", zap.Error(err))
} }
view.RegisterExporter(pe) view.RegisterExporter(pe)
p.log.Info(fmt.Sprintf("Starting server at %s:%d", address, port)) p.log.Info("starting server", zap.String("address", address), zap.Int("port", port))
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("/metrics", pe) mux.Handle("/metrics", pe)
@ -58,7 +58,7 @@ func NewMetricsServer(address string, port int, log *zap.SugaredLogger) *Server
metrics.PeersView, metrics.PeersView,
metrics.DialsView, metrics.DialsView,
); err != nil { ); err != nil {
p.log.Fatalf("Failed to register views: %v", err) p.log.Fatal("registering views", zap.Error(err))
} }
p.server = &http.Server{ p.server = &http.Server{
@ -71,14 +71,14 @@ func NewMetricsServer(address string, port int, log *zap.SugaredLogger) *Server
// Start executes the HTTP server in the background. // Start executes the HTTP server in the background.
func (p *Server) Start() { func (p *Server) Start() {
p.log.Info("server stopped ", p.server.ListenAndServe()) p.log.Info("server stopped ", zap.Error(p.server.ListenAndServe()))
} }
// Stop shuts down the prometheus server // Stop shuts down the prometheus server
func (p *Server) Stop(ctx context.Context) error { func (p *Server) Stop(ctx context.Context) error {
err := p.server.Shutdown(ctx) err := p.server.Shutdown(ctx)
if err != nil { if err != nil {
p.log.Error("error while stopping server", err) p.log.Error("stopping server", zap.Error(err))
return err return err
} }

View File

@ -10,7 +10,7 @@ import (
) )
func TestStartAndStopMetricsServer(t *testing.T) { func TestStartAndStopMetricsServer(t *testing.T) {
server := NewMetricsServer("0.0.0.0", 9876, utils.Logger().Sugar()) server := NewMetricsServer("0.0.0.0", 9876, utils.Logger())
go func() { go func() {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)

View File

@ -105,7 +105,7 @@ func Execute(options Options) {
var metricsServer *metrics.Server var metricsServer *metrics.Server
if options.Metrics.Enable { if options.Metrics.Enable {
metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port, logger.Sugar()) metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port, logger)
go metricsServer.Start() go metricsServer.Start()
} }
@ -190,7 +190,7 @@ func Execute(options Options) {
if options.Store.Enable { if options.Store.Enable {
nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages)) nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages))
if options.UseDB { if options.UseDB {
dbStore, err := persistence.NewDBStore(logger.Sugar(), persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration())) dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration()))
failOnErr(err, "DBStore") failOnErr(err, "DBStore")
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
} else { } else {
@ -299,7 +299,7 @@ func Execute(options Options) {
var rpcServer *rpc.WakuRpc var rpcServer *rpc.WakuRpc
if options.RPCServer.Enable { if options.RPCServer.Enable {
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, logger.Sugar()) rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, logger)
rpcServer.Start() rpcServer.Start()
} }

View File

@ -2,7 +2,6 @@ package persistence
import ( import (
"database/sql" "database/sql"
"fmt"
"time" "time"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
@ -20,7 +19,7 @@ type MessageProvider interface {
type DBStore struct { type DBStore struct {
MessageProvider MessageProvider
db *sql.DB db *sql.DB
log *zap.SugaredLogger log *zap.Logger
maxMessages int maxMessages int
maxDuration time.Duration maxDuration time.Duration
@ -67,7 +66,7 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption {
// Creates a new DB store using the db specified via options. // Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist and // It will create a messages table if it does not exist and
// clean up records according to the retention policy used // clean up records according to the retention policy used
func NewDBStore(log *zap.SugaredLogger, options ...DBOption) (*DBStore, error) { func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
result := new(DBStore) result := new(DBStore)
result.log = log.Named("dbstore") result.log = log.Named("dbstore")
@ -124,7 +123,7 @@ func (d *DBStore) cleanOlderRecords() error {
return err return err
} }
elapsed := time.Since(start) elapsed := time.Since(start)
d.log.Debug(fmt.Sprintf("Deleting older records from the DB took %s", elapsed)) d.log.Debug("deleting older records from the DB", zap.Duration("duration", elapsed))
} }
// Limit number of records to a max N // Limit number of records to a max N
@ -136,7 +135,7 @@ func (d *DBStore) cleanOlderRecords() error {
return err return err
} }
elapsed := time.Since(start) elapsed := time.Since(start)
d.log.Debug(fmt.Sprintf("Deleting excess records from the DB took %s", elapsed)) d.log.Debug("deleting excess records from the DB", zap.Duration("duration", elapsed))
} }
return nil return nil
@ -171,7 +170,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
elapsed := time.Since(start) elapsed := time.Since(start)
d.log.Info(fmt.Sprintf("Loading records from the DB took %s", elapsed)) d.log.Info("loading records from the DB", zap.Duration("duration", elapsed))
}() }()
rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC") rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC")
@ -194,7 +193,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) {
err = rows.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version) err = rows.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version)
if err != nil { if err != nil {
d.log.Fatal(err) d.log.Fatal("scanning next row", zap.Error(err))
} }
msg := new(pb.WakuMessage) msg := new(pb.WakuMessage)
@ -213,7 +212,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) {
result = append(result, record) result = append(result, record)
} }
d.log.Info(fmt.Sprintf("DB returned %d records", len(result))) d.log.Info("DB returned records", zap.Int("count", len(result)))
err = rows.Err() err = rows.Err()
if err != nil { if err != nil {

View File

@ -33,7 +33,7 @@ func createIndex(digest []byte, receiverTime int64) *pb.Index {
func TestDbStore(t *testing.T) { func TestDbStore(t *testing.T) {
db := NewMock() db := NewMock()
option := WithDB(db) option := WithDB(db)
store, err := NewDBStore(utils.Logger().Sugar(), option) store, err := NewDBStore(utils.Logger(), option)
require.NoError(t, err) require.NoError(t, err)
res, err := store.GetAll() res, err := store.GetAll()
@ -54,7 +54,7 @@ func TestDbStore(t *testing.T) {
func TestStoreRetention(t *testing.T) { func TestStoreRetention(t *testing.T) {
db := NewMock() db := NewMock()
store, err := NewDBStore(utils.Logger().Sugar(), WithDB(db), WithRetentionPolicy(5, 20*time.Second)) store, err := NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 20*time.Second))
require.NoError(t, err) require.NoError(t, err)
insertTime := time.Now() insertTime := time.Now()
@ -74,7 +74,7 @@ func TestStoreRetention(t *testing.T) {
// This step simulates starting go-waku again from scratch // This step simulates starting go-waku again from scratch
store, err = NewDBStore(utils.Logger().Sugar(), WithDB(db), WithRetentionPolicy(5, 40*time.Second)) store, err = NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second))
require.NoError(t, err) require.NoError(t, err)
dbResults, err = store.GetAll() dbResults, err = store.GetAll()

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"fmt"
"math" "math"
"math/rand" "math/rand"
"net" "net"
@ -20,6 +19,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-discover/discover" "github.com/status-im/go-discover/discover"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/utils" "github.com/status-im/go-waku/waku/v2/utils"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -38,7 +38,7 @@ type DiscoveryV5 struct {
NAT nat.Interface NAT nat.Interface
quit chan struct{} quit chan struct{}
log *zap.SugaredLogger log *zap.Logger
wg *sync.WaitGroup wg *sync.WaitGroup
@ -101,7 +101,7 @@ func DefaultOptions() []DiscoveryV5Option {
} }
} }
func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.PrivateKey, wakuFlags utils.WakuEnrBitfield, log *zap.SugaredLogger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.PrivateKey, wakuFlags utils.WakuEnrBitfield, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
params := new(discV5Parameters) params := new(discV5Parameters)
optList := DefaultOptions() optList := DefaultOptions()
optList = append(optList, opts...) optList = append(optList, opts...)
@ -156,7 +156,7 @@ func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.Privat
}, nil }, nil
} }
func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.SugaredLogger) (*enode.LocalNode, error) { func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) {
db, err := enode.OpenDB("") db, err := enode.OpenDB("")
if err != nil { if err != nil {
return nil, err return nil, err
@ -170,13 +170,13 @@ func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, waku
if udpPort > 0 && udpPort <= math.MaxUint16 { if udpPort > 0 && udpPort <= math.MaxUint16 {
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion]
} else { } else {
log.Error("could not set udpPort ", zap.Int("port", udpPort)) log.Error("setting udpPort", zap.Int("port", udpPort))
} }
if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 {
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion]
} else { } else {
log.Error("could not set tcpPort ", zap.Int("port", ipAddr.Port)) log.Error("setting tcpPort", zap.Int("port", ipAddr.Port))
} }
if advertiseAddr != nil { if advertiseAddr != nil {
@ -212,8 +212,10 @@ func (d *DiscoveryV5) listen() error {
d.listener = listener d.listener = listener
d.log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.localnode.Node().TCP())) d.log.Info("started Discovery V5",
d.log.Info("Discv5: discoverable ENR ", d.localnode.Node()) zap.Stringer("listening", d.udpAddr),
logging.TCPAddr("advertising", d.localnode.Node().IP(), d.localnode.Node().TCP()))
d.log.Info("Discovery V5: discoverable ENR ", logging.ENode("enr", d.localnode.Node()))
return nil return nil
} }
@ -243,7 +245,7 @@ func (d *DiscoveryV5) Stop() {
d.listener.Close() d.listener.Close()
d.listener = nil d.listener = nil
d.log.Info("Stopped Discovery V5") d.log.Info("stopped Discovery V5")
d.wg.Wait() d.wg.Wait()
} }
@ -270,8 +272,8 @@ func (d *DiscoveryV5) UpdateAddr(addr *net.TCPAddr) error {
d.localnode.SetStaticIP(addr.IP) d.localnode.SetStaticIP(addr.IP)
d.localnode.Set(enr.TCP(uint16(addr.Port))) // lgtm [go/incorrect-integer-conversion] d.localnode.Set(enr.TCP(uint16(addr.Port))) // lgtm [go/incorrect-integer-conversion]
d.log.Info(fmt.Sprintf("Updated Discovery V5 node: %s:%d", d.localnode.Node().IP(), d.localnode.Node().TCP())) d.log.Info("updated Discovery V5 node address", logging.TCPAddr("address", d.localnode.Node().IP(), d.localnode.Node().TCP()))
d.log.Info("Discovery V5 ", d.localnode.Node()) d.log.Info("Discovery V5", logging.ENode("enr", d.localnode.Node()))
return nil return nil
} }
@ -298,7 +300,7 @@ func hasTCPPort(node *enode.Node) bool {
enrTCP := new(enr.TCP) enrTCP := new(enr.TCP)
if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil { if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil {
if !enr.IsNotFound(err) { if !enr.IsNotFound(err) {
utils.Logger().Named("discv5").Error("could not retrieve port for enr ", zap.Any("node", node)) utils.Logger().Named("discv5").Error("retrieving port for enr", logging.ENode("enr", node))
} }
return false return false
} }
@ -319,7 +321,7 @@ func evaluateNode(node *enode.Node) bool {
_, err := utils.EnodeToPeerInfo(node) _, err := utils.EnodeToPeerInfo(node)
if err != nil { if err != nil {
utils.Logger().Named("discv5").Error("could not obtain peer info from enode:", zap.Error(err)) utils.Logger().Named("discv5").Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err))
return false return false
} }
@ -358,13 +360,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi
addresses, err := utils.Multiaddress(iterator.Node()) addresses, err := utils.Multiaddress(iterator.Node())
if err != nil { if err != nil {
d.log.Error(err) d.log.Error("extracting multiaddrs from enr", zap.Error(err))
continue continue
} }
peerAddrs, err := peer.AddrInfosFromP2pAddrs(addresses...) peerAddrs, err := peer.AddrInfosFromP2pAddrs(addresses...)
if err != nil { if err != nil {
d.log.Error(err) d.log.Error("converting multiaddrs to addrinfos", zap.Error(err))
continue continue
} }

View File

@ -47,19 +47,19 @@ func TestDiscV5(t *testing.T) {
host1, _, prvKey1 := createHost(t) host1, _, prvKey1 := createHost(t)
udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err) require.NoError(t, err)
d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort1)) d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort1))
require.NoError(t, err) require.NoError(t, err)
host2, _, prvKey2 := createHost(t) host2, _, prvKey2 := createHost(t)
udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err) require.NoError(t, err)
d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
require.NoError(t, err) require.NoError(t, err)
host3, _, prvKey3 := createHost(t) host3, _, prvKey3 := createHost(t)
udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3) udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err) require.NoError(t, err)
d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
require.NoError(t, err) require.NoError(t, err)
defer d1.Stop() defer d1.Stop()

View File

@ -86,7 +86,7 @@ type WakuNode struct {
} }
func defaultStoreFactory(w *WakuNode) store.Store { func defaultStoreFactory(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log.Sugar()) return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
} }
// New is used to instantiate a WakuNode using a set of WakuNodeOptions // New is used to instantiate a WakuNode using a set of WakuNodeOptions
@ -218,7 +218,7 @@ func (w *WakuNode) onAddrChange() {
if w.opts.enableDiscV5 { if w.opts.enableDiscV5 {
err := w.discoveryV5.UpdateAddr(addr) err := w.discoveryV5.UpdateAddr(addr)
if err != nil { if err != nil {
w.log.Error("updating DiscV5 address with IP", zap.Stringer("ip", addr.IP), zap.Int("port", addr.Port), zap.Error(err)) w.log.Error("updating DiscV5 address with IP", zap.Stringer("address", addr), zap.Error(err))
continue continue
} }
} }
@ -235,7 +235,7 @@ func (w *WakuNode) logAddress(addr ma.Multiaddr) {
if err != nil { if err != nil {
logger.Error("obtaining ENR record from multiaddress", zap.Error(err)) logger.Error("obtaining ENR record from multiaddress", zap.Error(err))
} else { } else {
logger.Info("listening", zap.Stringer("ENR", enr), zap.Stringer("ip", ip)) logger.Info("listening", logging.ENode("enr", enr), zap.Stringer("ip", ip))
} }
} }
} }
@ -283,7 +283,7 @@ func (w *WakuNode) checkForAddressChanges() {
func (w *WakuNode) Start() error { func (w *WakuNode) Start() error {
w.log.Info("Version details ", zap.String("commit", GitCommit)) w.log.Info("Version details ", zap.String("commit", GitCommit))
w.swap = swap.NewWakuSwap(w.log.Sugar(), []swap.SwapOption{ w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{
swap.WithMode(w.opts.swapMode), swap.WithMode(w.opts.swapMode),
swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold), swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold),
}...) }...)
@ -294,7 +294,7 @@ func (w *WakuNode) Start() error {
} }
if w.opts.enableFilter { if w.opts.enableFilter {
filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log.Sugar(), w.opts.filterOpts...) filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log, w.opts.filterOpts...)
if err != nil { if err != nil {
return err return err
} }
@ -322,7 +322,7 @@ func (w *WakuNode) Start() error {
return err return err
} }
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log.Sugar()) w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log)
if w.opts.enableLightPush { if w.opts.enableLightPush {
if err := w.lightPush.Start(); err != nil { if err := w.lightPush.Start(); err != nil {
return err return err
@ -462,7 +462,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error { func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error {
var err error var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log.Sugar(), opts...) w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log, opts...)
if err != nil { if err != nil {
return err return err
} }
@ -492,7 +492,7 @@ func (w *WakuNode) mountDiscV5() error {
} }
var err error var err error
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log.Sugar(), discV5Options...) w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log, discV5Options...)
return err return err
} }

View File

@ -30,7 +30,7 @@ func TestWakuOptions(t *testing.T) {
advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
storeFactory := func(w *WakuNode) store.Store { storeFactory := func(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log.Sugar()) return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
} }
options := []WakuNodeOption{ options := []WakuNodeOption{

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt"
"math" "math"
"sync" "sync"
@ -13,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio" "github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
@ -50,7 +50,7 @@ type (
isFullNode bool isFullNode bool
MsgC chan *protocol.Envelope MsgC chan *protocol.Envelope
wg *sync.WaitGroup wg *sync.WaitGroup
log *zap.SugaredLogger log *zap.Logger
filters *FilterMap filters *FilterMap
subscribers *Subscribers subscribers *Subscribers
@ -62,13 +62,13 @@ type (
// relay protocol. // relay protocol.
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.SugaredLogger, opts ...Option) (*WakuFilter, error) { func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.Logger, opts ...Option) (*WakuFilter, error) {
wf := new(WakuFilter) wf := new(WakuFilter)
wf.log = log.Named("filter") wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode))
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil { if err != nil {
wf.log.Error(err) wf.log.Error("creating tag map", zap.Error(err))
return nil, errors.New("could not start waku filter") return nil, errors.New("could not start waku filter")
} }
@ -92,17 +92,13 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *za
wf.wg.Add(1) wf.wg.Add(1)
go wf.FilterListener() go wf.FilterListener()
if wf.isFullNode { wf.log.Info("filter protocol started")
wf.log.Info("Filter protocol started")
} else {
wf.log.Info("Filter protocol started (only client mode)")
}
return wf, nil return wf, nil
} }
func (wf *WakuFilter) onRequest(s network.Stream) { func (wf *WakuFilter) onRequest(s network.Stream) {
defer s.Close() defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
filterRPCRequest := &pb.FilterRPC{} filterRPCRequest := &pb.FilterRPC{}
@ -110,11 +106,11 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
err := reader.ReadMsg(filterRPCRequest) err := reader.ReadMsg(filterRPCRequest)
if err != nil { if err != nil {
wf.log.Error("error reading request", err) logger.Error("reading request", zap.Error(err))
return return
} }
wf.log.Info(fmt.Sprintf("%s: received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) logger.Info("received request")
if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 { if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 {
// We're on a light node. // We're on a light node.
@ -123,7 +119,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node
} }
wf.log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") logger.Info("received a message push", zap.Int("messages", len(filterRPCRequest.Push.Messages)))
stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages)))) stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages))))
} else if filterRPCRequest.Request != nil && wf.isFullNode { } else if filterRPCRequest.Request != nil && wf.isFullNode {
// We're on a full node. // We're on a full node.
@ -132,29 +128,30 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request} subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request}
len := wf.subscribers.Append(subscriber) len := wf.subscribers.Append(subscriber)
wf.log.Info("filter full node, add a filter subscriber: ", subscriber.peer) logger.Info("adding subscriber")
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len))) stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len)))
} else { } else {
peerId := s.Conn().RemotePeer() peerId := s.Conn().RemotePeer()
wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.Request.ContentFilters) wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.Request.ContentFilters)
wf.log.Info("filter full node, remove a filter subscriber: ", peerId.Pretty()) logger.Info("removing subscriber")
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length()))) stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length())))
} }
} else { } else {
wf.log.Error("can't serve request") logger.Error("can't serve request")
return return
} }
} }
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error { func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}} pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}
logger := wf.log.With(logging.HostID("peer", subscriber.peer))
// We connect first so dns4 addresses are resolved (NewStream does not do it) // We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wf.h.Connect(wf.ctx, wf.h.Peerstore().PeerInfo(subscriber.peer)) err := wf.h.Connect(wf.ctx, wf.h.Peerstore().PeerInfo(subscriber.peer))
if err != nil { if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer) wf.subscribers.FlagAsFailure(subscriber.peer)
wf.log.Error("failed to connect to peer", err) logger.Error("connecting to peer", zap.Error(err))
return err return err
} }
@ -162,7 +159,7 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er
if err != nil { if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer) wf.subscribers.FlagAsFailure(subscriber.peer)
wf.log.Error("failed to open peer stream", err) logger.Error("opening peer stream", zap.Error(err))
//waku_filter_errors.inc(labelValues = [dialFailure]) //waku_filter_errors.inc(labelValues = [dialFailure])
return err return err
} }
@ -171,7 +168,7 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er
writer := protoio.NewDelimitedWriter(conn) writer := protoio.NewDelimitedWriter(conn)
err = writer.WriteMsg(pushRPC) err = writer.WriteMsg(pushRPC)
if err != nil { if err != nil {
wf.log.Error("failed to push messages to remote peer", err) logger.Error("pushing messages to peer", zap.Error(err))
wf.subscribers.FlagAsFailure(subscriber.peer) wf.subscribers.FlagAsFailure(subscriber.peer)
return nil return nil
} }
@ -188,25 +185,29 @@ func (wf *WakuFilter) FilterListener() {
handle := func(envelope *protocol.Envelope) error { // async handle := func(envelope *protocol.Envelope) error { // async
msg := envelope.Message() msg := envelope.Message()
topic := envelope.PubsubTopic() topic := envelope.PubsubTopic()
logger := wf.log.With(zap.Stringer("message", msg))
g := new(errgroup.Group) g := new(errgroup.Group)
// Each subscriber is a light node that earlier on invoked // Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node // a FilterRequest on this node
for subscriber := range wf.subscribers.Items() { for subscriber := range wf.subscribers.Items() {
logger := logger.With(logging.HostID("subscriber", subscriber.peer))
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic {
wf.log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic) logger.Info("pubsub topic mismatch",
zap.String("subscriberTopic", subscriber.filter.Topic),
zap.String("messageTopic", topic))
continue continue
} }
for _, filter := range subscriber.filter.ContentFilters { for _, filter := range subscriber.filter.ContentFilters {
if msg.ContentTopic == filter.ContentTopic { if msg.ContentTopic == filter.ContentTopic {
wf.log.Info("found matching contentTopic ", filter, msg) logger.Info("found matching content topic", zap.Stringer("filter", filter))
// Do a message push to light node // Do a message push to light node
wf.log.Info("pushing messages to light node: ", subscriber.peer) logger.Info("pushing message to light node")
g.Go(func() (err error) { g.Go(func() (err error) {
err = wf.pushMessage(subscriber, msg) err = wf.pushMessage(subscriber, msg)
if err != nil { if err != nil {
wf.log.Error(err) logger.Error("pushing message", zap.Error(err))
} }
return err return err
}) })
@ -221,7 +222,7 @@ func (wf *WakuFilter) FilterListener() {
for m := range wf.MsgC { for m := range wf.MsgC {
if err := handle(m); err != nil { if err := handle(m); err != nil {
wf.log.Error("failed to handle message", err) wf.log.Error("handling message", zap.Error(err))
} }
} }
} }
@ -274,10 +275,10 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
writer := protoio.NewDelimitedWriter(conn) writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: requestID, Request: &request} filterRPC := &pb.FilterRPC{RequestId: requestID, Request: &request}
wf.log.Info("sending filterRPC: ", filterRPC) wf.log.Info("sending filterRPC", zap.Stringer("rpc", filterRPC))
err = writer.WriteMsg(filterRPC) err = writer.WriteMsg(filterRPC)
if err != nil { if err != nil {
wf.log.Error("failed to write message", err) wf.log.Error("sending filterRPC", zap.Error(err))
return return
} }
@ -342,7 +343,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi
remoteSubs, err := wf.requestSubscription(ctx, f, opts...) remoteSubs, err := wf.requestSubscription(ctx, f, opts...)
if err != nil || remoteSubs.RequestID == "" { if err != nil || remoteSubs.RequestID == "" {
// Failed to subscribe // Failed to subscribe
wf.log.Error("remote subscription to filter failed", err) wf.log.Error("requesting subscription", zap.Error(err))
return return
} }

View File

@ -14,7 +14,7 @@ type (
FilterSubscribeParameters struct { FilterSubscribeParameters struct {
host host.Host host host.Host
selectedPeer peer.ID selectedPeer peer.ID
log *zap.SugaredLogger log *zap.Logger
} }
FilterSubscribeOption func(*FilterSubscribeParameters) FilterSubscribeOption func(*FilterSubscribeParameters)
@ -40,22 +40,22 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
func WithAutomaticPeerSelection() FilterSubscribeOption { func WithAutomaticPeerSelection() FilterSubscribeOption {
return func(params *FilterSubscribeParameters) { return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log.Desugar()) p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
params.log.Info("Error selecting peer: ", err) params.log.Info("selecting peer", zap.Error(err))
} }
} }
} }
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) { return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log.Desugar()) p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
params.log.Info("Error selecting peer: ", err) params.log.Info("selecting peer", zap.Error(err))
} }
} }
} }

View File

@ -25,7 +25,7 @@ func TestFilterOption(t *testing.T) {
params := new(FilterSubscribeParameters) params := new(FilterSubscribeParameters)
params.host = host params.host = host
params.log = utils.Logger().Sugar() params.log = utils.Logger()
for _, opt := range options { for _, opt := range options {
opt(params) opt(params)

View File

@ -23,7 +23,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel
host, err := tests.MakeHost(context.Background(), port, rand.Reader) host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger().Sugar()) relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), topic) sub, err := relay.SubscribeToTopic(context.Background(), topic)
@ -39,7 +39,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader) host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger().Sugar()) filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger())
return filter, host return filter, host
} }
@ -69,7 +69,7 @@ func TestWakuFilter(t *testing.T) {
defer node2.Stop() defer node2.Stop()
defer sub2.Unsubscribe() defer sub2.Unsubscribe()
node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger().Sugar()) node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger())
broadcaster.Register(&testTopic, node2Filter.MsgC) broadcaster.Register(&testTopic, node2Filter.MsgC)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
@ -154,7 +154,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
defer node2.Stop() defer node2.Stop()
defer sub2.Unsubscribe() defer sub2.Unsubscribe()
node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger().Sugar(), WithTimeout(3*time.Second)) node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger(), WithTimeout(3*time.Second))
broadcaster.Register(&testTopic, node2Filter.MsgC) broadcaster.Register(&testTopic, node2Filter.MsgC)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)

View File

@ -4,13 +4,13 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt"
"math" "math"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio" "github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
@ -30,13 +30,13 @@ type WakuLightPush struct {
relay *relay.WakuRelay relay *relay.WakuRelay
ctx context.Context ctx context.Context
log *zap.SugaredLogger log *zap.Logger
started bool started bool
} }
// NewWakuRelay returns a new instance of Waku Lightpush struct // NewWakuRelay returns a new instance of Waku Lightpush struct
func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay, log *zap.SugaredLogger) *WakuLightPush { func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush {
wakuLP := new(WakuLightPush) wakuLP := new(WakuLightPush)
wakuLP.relay = relay wakuLP.relay = relay
wakuLP.ctx = ctx wakuLP.ctx = ctx
@ -66,7 +66,7 @@ func (wakuLp *WakuLightPush) IsClientOnly() bool {
func (wakuLP *WakuLightPush) onRequest(s network.Stream) { func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
defer s.Close() defer s.Close()
logger := wakuLP.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
requestPushRPC := &pb.PushRPC{} requestPushRPC := &pb.PushRPC{}
writer := protoio.NewDelimitedWriter(s) writer := protoio.NewDelimitedWriter(s)
@ -74,15 +74,15 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
err := reader.ReadMsg(requestPushRPC) err := reader.ReadMsg(requestPushRPC)
if err != nil { if err != nil {
wakuLP.log.Error("error reading request", err) logger.Error("reading request", zap.Error(err))
metrics.RecordLightpushError(wakuLP.ctx, "decodeRpcFailure") metrics.RecordLightpushError(wakuLP.ctx, "decodeRpcFailure")
return return
} }
wakuLP.log.Info(fmt.Sprintf("%s: lightpush message received from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) logger.Info("request received")
if requestPushRPC.Query != nil { if requestPushRPC.Query != nil {
wakuLP.log.Info("lightpush push request") logger.Info("push request")
response := new(pb.PushResponse) response := new(pb.PushResponse)
if !wakuLP.IsClientOnly() { if !wakuLP.IsClientOnly() {
pubSubTopic := requestPushRPC.Query.PubsubTopic pubSubTopic := requestPushRPC.Query.PubsubTopic
@ -101,7 +101,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
response.Info = "Totally" // TODO: ask about this response.Info = "Totally" // TODO: ask about this
} }
} else { } else {
wakuLP.log.Debug("no relay protocol present, unsuccessful push") logger.Debug("no relay protocol present, unsuccessful push")
response.IsSuccess = false response.IsSuccess = false
response.Info = "No relay protocol" response.Info = "No relay protocol"
} }
@ -112,18 +112,18 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
err = writer.WriteMsg(responsePushRPC) err = writer.WriteMsg(responsePushRPC)
if err != nil { if err != nil {
wakuLP.log.Error("error writing response", err) logger.Error("writing response", zap.Error(err))
_ = s.Reset() _ = s.Reset()
} else { } else {
wakuLP.log.Info(fmt.Sprintf("%s: response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) logger.Info("response sent")
} }
} }
if requestPushRPC.Response != nil { if requestPushRPC.Response != nil {
if requestPushRPC.Response.IsSuccess { if requestPushRPC.Response.IsSuccess {
wakuLP.log.Info("lightpush message success") logger.Info("request success")
} else { } else {
wakuLP.log.Info(fmt.Sprintf("lightpush message failure. info=%s", requestPushRPC.Response.Info)) logger.Info("request failure", zap.String("info=", requestPushRPC.Response.Info))
} }
} }
} }
@ -148,15 +148,17 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
return nil, ErrInvalidId return nil, ErrInvalidId
} }
logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer))
// We connect first so dns4 addresses are resolved (NewStream does not do it) // We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wakuLP.h.Connect(ctx, wakuLP.h.Peerstore().PeerInfo(params.selectedPeer)) err := wakuLP.h.Connect(ctx, wakuLP.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil { if err != nil {
logger.Error("connecting peer", zap.Error(err))
return nil, err return nil, err
} }
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil { if err != nil {
wakuLP.log.Info("failed to connect to remote peer", err) logger.Error("creating stream to peer", zap.Error(err))
metrics.RecordLightpushError(wakuLP.ctx, "dialError") metrics.RecordLightpushError(wakuLP.ctx, "dialError")
return nil, err return nil, err
} }
@ -166,7 +168,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
err := connOpt.Reset() err := connOpt.Reset()
if err != nil { if err != nil {
metrics.RecordLightpushError(wakuLP.ctx, "dialError") metrics.RecordLightpushError(wakuLP.ctx, "dialError")
wakuLP.log.Error("failed to reset connection", err) logger.Error("resetting connection", zap.Error(err))
} }
}() }()
@ -177,14 +179,14 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
err = writer.WriteMsg(pushRequestRPC) err = writer.WriteMsg(pushRequestRPC)
if err != nil { if err != nil {
wakuLP.log.Error("could not write request", err) logger.Error("writing request", zap.Error(err))
return nil, err return nil, err
} }
pushResponseRPC := &pb.PushRPC{} pushResponseRPC := &pb.PushRPC{}
err = reader.ReadMsg(pushResponseRPC) err = reader.ReadMsg(pushResponseRPC)
if err != nil { if err != nil {
wakuLP.log.Error("could not read response", err) logger.Error("reading response", zap.Error(err))
metrics.RecordLightpushError(wakuLP.ctx, "decodeRPCFailure") metrics.RecordLightpushError(wakuLP.ctx, "decodeRPCFailure")
return nil, err return nil, err
} }

View File

@ -14,7 +14,7 @@ type LightPushParameters struct {
host host.Host host host.Host
selectedPeer peer.ID selectedPeer peer.ID
requestId []byte requestId []byte
log *zap.SugaredLogger log *zap.Logger
} }
type LightPushOption func(*LightPushParameters) type LightPushOption func(*LightPushParameters)
@ -27,22 +27,22 @@ func WithPeer(p peer.ID) LightPushOption {
func WithAutomaticPeerSelection(host host.Host) LightPushOption { func WithAutomaticPeerSelection(host host.Host) LightPushOption {
return func(params *LightPushParameters) { return func(params *LightPushParameters) {
p, err := utils.SelectPeer(host, string(LightPushID_v20beta1), params.log.Desugar()) p, err := utils.SelectPeer(host, string(LightPushID_v20beta1), params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
params.log.Info("Error selecting peer: ", err) params.log.Info("selecting peer", zap.Error(err))
} }
} }
} }
func WithFastestPeerSelection(ctx context.Context) LightPushOption { func WithFastestPeerSelection(ctx context.Context) LightPushOption {
return func(params *LightPushParameters) { return func(params *LightPushParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log.Desugar()) p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
params.log.Info("Error selecting peer: ", err) params.log.Info("selecting peer", zap.Error(err))
} }
} }
} }

View File

@ -27,7 +27,7 @@ func TestLightPushOption(t *testing.T) {
params := new(LightPushParameters) params := new(LightPushParameters)
params.host = host params.host = host
params.log = utils.Logger().Sugar() params.log = utils.Logger()
for _, opt := range options { for _, opt := range options {
opt(params) opt(params)

View File

@ -25,7 +25,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri
host, err := tests.MakeHost(context.Background(), port, rand.Reader) host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger().Sugar()) relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), topic) sub, err := relay.SubscribeToTopic(context.Background(), topic)
@ -56,7 +56,7 @@ func TestWakuLightPush(t *testing.T) {
defer sub2.Unsubscribe() defer sub2.Unsubscribe()
ctx := context.Background() ctx := context.Background()
lightPushNode2 := NewWakuLightPush(ctx, host2, node2, utils.Logger().Sugar()) lightPushNode2 := NewWakuLightPush(ctx, host2, node2, utils.Logger())
err := lightPushNode2.Start() err := lightPushNode2.Start()
require.NoError(t, err) require.NoError(t, err)
defer lightPushNode2.Stop() defer lightPushNode2.Stop()
@ -66,7 +66,7 @@ func TestWakuLightPush(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar()) client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger())
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200)) err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200))
@ -122,7 +122,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar()) client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger())
err = client.Start() err = client.Start()
require.Errorf(t, err, "relay is required") require.Errorf(t, err, "relay is required")
@ -136,7 +136,7 @@ func TestWakuLightPushNoPeers(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar()) client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger())
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic) _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic)
require.Errorf(t, err, "no suitable remote peers") require.Errorf(t, err, "no suitable remote peers")

View File

@ -30,7 +30,7 @@ type WakuRelay struct {
host host.Host host host.Host
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
log *zap.SugaredLogger log *zap.Logger
bcaster v2.Broadcaster bcaster v2.Broadcaster
@ -52,7 +52,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
} }
// NewWakuRelay returns a new instance of a WakuRelay struct // NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, log *zap.SugaredLogger, opts ...pubsub.Option) (*WakuRelay, error) { func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, log *zap.Logger, opts ...pubsub.Option) (*WakuRelay, error) {
w := new(WakuRelay) w := new(WakuRelay)
w.host = h w.host = h
w.wakuRelayTopics = make(map[string]*pubsub.Topic) w.wakuRelayTopics = make(map[string]*pubsub.Topic)
@ -109,7 +109,7 @@ func (w *WakuRelay) Topics() []string {
return result return result
} }
// SetPubSub is used to set an aimplementation of the pubsub system // SetPubSub is used to set an implementation of the pubsub system
func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) { func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
w.pubsub = pubSub w.pubsub = pubSub
} }
@ -144,7 +144,7 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
} }
w.relaySubs[topic] = sub w.relaySubs[topic] = sub
w.log.Info("Subscribing to topic ", topic) w.log.Info("subscribing to topic", zap.String("topic", topic))
} }
return sub, nil return sub, nil
@ -162,7 +162,7 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage,
} }
if !w.EnoughPeersToPublishToTopic(topic) { if !w.EnoughPeersToPublishToTopic(topic) {
return nil, errors.New("not enougth peers to publish") return nil, errors.New("not enough peers to publish")
} }
pubSubTopic, err := w.upsertTopic(topic) pubSubTopic, err := w.upsertTopic(topic)
@ -255,7 +255,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
if _, ok := w.relaySubs[topic]; !ok { if _, ok := w.relaySubs[topic]; !ok {
return fmt.Errorf("topics %s is not subscribed", (string)(topic)) return fmt.Errorf("topics %s is not subscribed", (string)(topic))
} }
w.log.Info("Unsubscribing from topic ", topic) w.log.Info("unsubscribing from topic", zap.String("topic", topic))
for _, sub := range w.subscriptions[topic] { for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe() sub.Unsubscribe()
@ -285,7 +285,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
for { for {
msg, err := sub.Next(ctx) msg, err := sub.Next(ctx)
if err != nil { if err != nil {
w.log.Error(fmt.Errorf("subscription failed: %w", err)) w.log.Error("getting message from subscription", zap.Error(err))
sub.Cancel() sub.Cancel()
close(msgChannel) close(msgChannel)
for _, subscription := range w.subscriptions[sub.Topic()] { for _, subscription := range w.subscriptions[sub.Topic()] {
@ -302,7 +302,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) { func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) {
ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay")) ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay"))
if err != nil { if err != nil {
w.log.Error(err) w.log.Error("creating tag map", zap.Error(err))
return return
} }
@ -333,7 +333,7 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *
stats.Record(ctx, metrics.Messages.M(1)) stats.Record(ctx, metrics.Messages.M(1))
wakuMessage := &pb.WakuMessage{} wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
w.log.Error("could not decode message", err) w.log.Error("decoding message", zap.Error(err))
return return
} }

View File

@ -20,7 +20,7 @@ func TestWakuRelay(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader) host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger().Sugar()) relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger())
defer relay.Stop() defer relay.Stop()
require.NoError(t, err) require.NoError(t, err)

View File

@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) {
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test") msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
_ = s.storeMessage(msg1) _ = s.storeMessage(msg1)
_ = s.storeMessage(msg3) _ = s.storeMessage(msg3)
_ = s.storeMessage(msg5) _ = s.storeMessage(msg5)
@ -38,7 +38,7 @@ func TestResume(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
s1.Start(ctx) s1.Start(ctx)
defer s1.Stop() defer s1.Stop()
@ -56,7 +56,7 @@ func TestResume(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
s2.Start(ctx) s2.Start(ctx)
defer s2.Stop() defer s2.Stop()
@ -88,7 +88,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
s1.Start(ctx) s1.Start(ctx)
defer s1.Stop() defer s1.Stop()
@ -99,7 +99,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
s2.Start(ctx) s2.Start(ctx)
defer s2.Stop() defer s2.Stop()
@ -121,7 +121,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
s1.Start(ctx) s1.Start(ctx)
defer s1.Stop() defer s1.Stop()
@ -132,7 +132,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
s2.Start(ctx) s2.Start(ctx)
defer s2.Stop() defer s2.Stop()

View File

@ -5,7 +5,6 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt"
"math" "math"
"sort" "sort"
"strings" "strings"
@ -19,6 +18,7 @@ import (
"github.com/libp2p/go-msgio/protoio" "github.com/libp2p/go-msgio/protoio"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
@ -239,7 +239,7 @@ type WakuStore struct {
MsgC chan *protocol.Envelope MsgC chan *protocol.Envelope
wg *sync.WaitGroup wg *sync.WaitGroup
log *zap.SugaredLogger log *zap.Logger
started bool started bool
@ -259,7 +259,7 @@ type Store interface {
} }
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.SugaredLogger) *WakuStore { func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.Logger) *WakuStore {
wakuStore := new(WakuStore) wakuStore := new(WakuStore)
wakuStore.msgProvider = p wakuStore.msgProvider = p
wakuStore.h = host wakuStore.h = host
@ -308,12 +308,14 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) {
start := time.Now() start := time.Now()
defer func() { defer func() {
elapsed := time.Since(start) elapsed := time.Since(start)
store.log.Info(fmt.Sprintf("Store initialization took %s", elapsed)) store.log.Info("Store initialization complete",
zap.Duration("duration", elapsed),
zap.Int("messages", store.messageQueue.Length()))
}() }()
storedMessages, err := (store.msgProvider).GetAll() storedMessages, err := (store.msgProvider).GetAll()
if err != nil { if err != nil {
store.log.Error("could not load DBProvider messages", err) store.log.Error("loading DBProvider messages", zap.Error(err))
metrics.RecordStoreError(ctx, "store_load_failure") metrics.RecordStoreError(ctx, "store_load_failure")
return return
} }
@ -328,8 +330,6 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) {
} }
metrics.RecordMessage(ctx, "stored", store.messageQueue.Length()) metrics.RecordMessage(ctx, "stored", store.messageQueue.Length())
store.log.Info(fmt.Sprintf("%d messages available in waku store", store.messageQueue.Length()))
} }
func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) error { func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) error {
@ -339,7 +339,7 @@ func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg
func (store *WakuStore) storeMessage(env *protocol.Envelope) error { func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
index, err := computeIndex(env) index, err := computeIndex(env)
if err != nil { if err != nil {
store.log.Error("could not calculate message index", err) store.log.Error("creating message index", zap.Error(err))
return err return err
} }
@ -356,7 +356,7 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
// TODO: Move this to a separate go routine if DB writes becomes a bottleneck // TODO: Move this to a separate go routine if DB writes becomes a bottleneck
err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored? err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored?
if err != nil { if err != nil {
store.log.Error("could not store message", err) store.log.Error("storing message", zap.Error(err))
metrics.RecordStoreError(store.ctx, "store_failure") metrics.RecordStoreError(store.ctx, "store_failure")
return err return err
} }
@ -374,7 +374,7 @@ func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
func (store *WakuStore) onRequest(s network.Stream) { func (store *WakuStore) onRequest(s network.Stream) {
defer s.Close() defer s.Close()
logger := store.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
historyRPCRequest := &pb.HistoryRPC{} historyRPCRequest := &pb.HistoryRPC{}
writer := protoio.NewDelimitedWriter(s) writer := protoio.NewDelimitedWriter(s)
@ -382,23 +382,27 @@ func (store *WakuStore) onRequest(s network.Stream) {
err := reader.ReadMsg(historyRPCRequest) err := reader.ReadMsg(historyRPCRequest)
if err != nil { if err != nil {
store.log.Error("error reading request", err) logger.Error("reading request", zap.Error(err))
metrics.RecordStoreError(store.ctx, "decodeRPCFailure") metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
return return
} }
logger = logger.With(zap.String("id", historyRPCRequest.RequestId))
store.log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) if query := historyRPCRequest.Query; query != nil {
logger = logger.With(logging.Filters(query.GetContentFilters()))
}
logger.Info("received query")
historyResponseRPC := &pb.HistoryRPC{} historyResponseRPC := &pb.HistoryRPC{}
historyResponseRPC.RequestId = historyRPCRequest.RequestId historyResponseRPC.RequestId = historyRPCRequest.RequestId
historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query) historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query)
logger = logger.With(zap.Int("messages", len(historyResponseRPC.Response.Messages)))
err = writer.WriteMsg(historyResponseRPC) err = writer.WriteMsg(historyResponseRPC)
if err != nil { if err != nil {
store.log.Error("error writing response", err) logger.Error("writing response", zap.Error(err), logging.PagingInfo(historyResponseRPC.Response.PagingInfo))
_ = s.Reset() _ = s.Reset()
} else { } else {
store.log.Info(fmt.Sprintf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) logger.Info("response sent")
} }
} }
@ -491,22 +495,22 @@ func WithPeer(p peer.ID) HistoryRequestOption {
// to request the message history // to request the message history
func WithAutomaticPeerSelection() HistoryRequestOption { func WithAutomaticPeerSelection() HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log.Desugar()) p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
params.s.log.Info("Error selecting peer: ", err) params.s.log.Info("selecting peer", zap.Error(err))
} }
} }
} }
func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log.Desugar()) p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
params.s.log.Info("Error selecting peer: ", err) params.s.log.Info("selecting peer", zap.Error(err))
} }
} }
} }
@ -547,17 +551,19 @@ func DefaultOptions() []HistoryRequestOption {
} }
func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) {
store.log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer)) logger := store.log.With(logging.HostID("peer", selectedPeer))
logger.Info("querying message history")
// We connect first so dns4 addresses are resolved (NewStream does not do it) // We connect first so dns4 addresses are resolved (NewStream does not do it)
err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer)) err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer))
if err != nil { if err != nil {
logger.Error("connecting to peer", zap.Error(err))
return nil, err return nil, err
} }
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil { if err != nil {
store.log.Error("Failed to connect to remote peer", err) logger.Error("creating stream to peer", zap.Error(err))
return nil, err return nil, err
} }
@ -573,14 +579,14 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
err = writer.WriteMsg(historyRequest) err = writer.WriteMsg(historyRequest)
if err != nil { if err != nil {
store.log.Error("could not write request", err) logger.Error("writing request", zap.Error(err))
return nil, err return nil, err
} }
historyResponseRPC := &pb.HistoryRPC{} historyResponseRPC := &pb.HistoryRPC{}
err = reader.ReadMsg(historyResponseRPC) err = reader.ReadMsg(historyResponseRPC)
if err != nil { if err != nil {
store.log.Error("could not read response", err) logger.Error("reading response", zap.Error(err))
metrics.RecordStoreError(store.ctx, "decodeRPCFailure") metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
return nil, err return nil, err
} }
@ -705,7 +711,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
resultChan <- result resultChan <- result
return return
} }
store.log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err)) store.log.Error("resuming history", logging.HostID("peer", peer), zap.Error(err))
}() }()
} }
@ -774,9 +780,9 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
} }
if len(peerList) == 0 { if len(peerList) == 0 {
p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log.Desugar()) p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log)
if err != nil { if err != nil {
store.log.Info("Error selecting peer: ", err) store.log.Info("selecting peer", zap.Error(err))
return -1, ErrNoPeersAvailable return -1, ErrNoPeersAvailable
} }
@ -785,7 +791,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
messages, err := store.queryLoop(ctx, rpc, peerList) messages, err := store.queryLoop(ctx, rpc, peerList)
if err != nil { if err != nil {
store.log.Error("failed to resume history", err) store.log.Error("resuming history", zap.Error(err))
return -1, ErrFailedToResumeHistory return -1, ErrFailedToResumeHistory
} }
@ -796,7 +802,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
} }
} }
store.log.Info("Retrieved messages since the last online time: ", len(messages)) store.log.Info("retrieved messages since the last online time", zap.Int("messages", len(messages)))
return msgCount, nil return msgCount, nil
} }

View File

@ -21,10 +21,10 @@ func TestStorePersistence(t *testing.T) {
db, err := sqlite.NewDB(":memory:") db, err := sqlite.NewDB(":memory:")
require.NoError(t, err) require.NoError(t, err)
dbStore, err := persistence.NewDBStore(utils.Logger().Sugar(), persistence.WithDB(db)) dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db))
require.NoError(t, err) require.NoError(t, err)
s1 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger().Sugar()) s1 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger())
s1.fetchDBRecords(ctx) s1.fetchDBRecords(ctx)
require.Len(t, s1.messageQueue.messages, 0) require.Len(t, s1.messageQueue.messages, 0)
@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) {
_ = s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) _ = s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
s2 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger().Sugar()) s2 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger())
s2.fetchDBRecords(ctx) s2.fetchDBRecords(ctx)
require.Len(t, s2.messageQueue.messages, 1) require.Len(t, s2.messageQueue.messages, 1)
require.Equal(t, msg, s2.messageQueue.messages[0].msg) require.Equal(t, msg, s2.messageQueue.messages[0].msg)

View File

@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
s1.Start(ctx) s1.Start(ctx)
defer s1.Stop() defer s1.Stop()
@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
// Simulate a message has been received via relay protocol // Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
s2.Start(ctx) s2.Start(ctx)
defer s2.Stop() defer s2.Stop()
@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
s1.Start(ctx) s1.Start(ctx)
defer s1.Stop() defer s1.Stop()
@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4)) err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4))
require.NoError(t, err) require.NoError(t, err)
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
s2.Start(ctx) s2.Start(ctx)
defer s2.Stop() defer s2.Stop()

View File

@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) {
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
topic1 := "1" topic1 := "1"
pubsubTopic1 := "topic1" pubsubTopic1 := "topic1"
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
msg.Payload = []byte{byte(i)} msg.Payload = []byte{byte(i)}
@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
topic1 := "1" topic1 := "1"
pubsubTopic1 := "topic1" pubsubTopic1 := "topic1"
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
msg := &pb.WakuMessage{ msg := &pb.WakuMessage{
Payload: []byte{byte(i)}, Payload: []byte{byte(i)},
@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
} }
func TestTemporalHistoryQueries(t *testing.T) { func TestTemporalHistoryQueries(t *testing.T) {
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
var messages []*pb.WakuMessage var messages []*pb.WakuMessage
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {

View File

@ -18,13 +18,13 @@ const WakuSwapID_v200 = protocol.ID("/vac/waku/swap/2.0.0-beta1")
type WakuSwap struct { type WakuSwap struct {
params *SwapParameters params *SwapParameters
log *zap.SugaredLogger log *zap.Logger
Accounting map[string]int Accounting map[string]int
accountingMutex sync.RWMutex accountingMutex sync.RWMutex
} }
func NewWakuSwap(log *zap.SugaredLogger, opts ...SwapOption) *WakuSwap { func NewWakuSwap(log *zap.Logger, opts ...SwapOption) *WakuSwap {
params := &SwapParameters{} params := &SwapParameters{}
optList := DefaultOptions() optList := DefaultOptions()
@ -45,12 +45,13 @@ func (s *WakuSwap) sendCheque(peerId string) {
} }
func (s *WakuSwap) applyPolicy(peerId string) { func (s *WakuSwap) applyPolicy(peerId string) {
logger := s.log.With(zap.String("peer", peerId))
if s.Accounting[peerId] <= s.params.disconnectThreshold { if s.Accounting[peerId] <= s.params.disconnectThreshold {
s.log.Warnf("Disconnect threshold has been reached for %s at %d", peerId, s.Accounting[peerId]) logger.Warn("disconnect threshold reached", zap.Int("value", s.Accounting[peerId]))
} }
if s.Accounting[peerId] >= s.params.paymentThreshold { if s.Accounting[peerId] >= s.params.paymentThreshold {
s.log.Warnf("Disconnect threshold has been reached for %s at %d", peerId, s.Accounting[peerId]) logger.Warn("payment threshold reached", zap.Int("value", s.Accounting[peerId]))
if s.params.mode != HardMode { if s.params.mode != HardMode {
s.sendCheque(peerId) s.sendCheque(peerId)
} }

View File

@ -8,7 +8,7 @@ import (
) )
func TestSwapCreditDebit(t *testing.T) { func TestSwapCreditDebit(t *testing.T) {
swap := NewWakuSwap(utils.Logger().Sugar(), []SwapOption{ swap := NewWakuSwap(utils.Logger(), []SwapOption{
WithMode(SoftMode), WithMode(SoftMode),
WithThreshold(0, 0), WithThreshold(0, 0),
}...) }...)

View File

@ -15,7 +15,7 @@ import (
type AdminService struct { type AdminService struct {
node *node.WakuNode node *node.WakuNode
log *zap.SugaredLogger log *zap.Logger
} }
type GetPeersArgs struct { type GetPeersArgs struct {
@ -39,7 +39,7 @@ func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *Su
for _, peer := range args.Peers { for _, peer := range args.Peers {
addr, err := ma.NewMultiaddr(peer) addr, err := ma.NewMultiaddr(peer)
if err != nil { if err != nil {
a.log.Error("Error building multiaddr", zap.Error(err)) a.log.Error("building multiaddr", zap.Error(err))
reply.Success = false reply.Success = false
reply.Error = err.Error() reply.Error = err.Error()
return nil return nil
@ -47,7 +47,7 @@ func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *Su
err = a.node.DialPeerWithMultiAddress(req.Context(), addr) err = a.node.DialPeerWithMultiAddress(req.Context(), addr)
if err != nil { if err != nil {
a.log.Error("Error dialing peers", zap.Error(err)) a.log.Error("dialing peers", zap.Error(err))
reply.Success = false reply.Success = false
reply.Error = err.Error() reply.Error = err.Error()
return nil return nil
@ -65,7 +65,7 @@ func isWakuProtocol(protocol string) bool {
func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *PeersReply) error { func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *PeersReply) error {
peers, err := a.node.Peers() peers, err := a.node.Peers()
if err != nil { if err != nil {
a.log.Error("Error getting peers", zap.Error(err)) a.log.Error("getting peers", zap.Error(err))
return nil return nil
} }
for _, peer := range peers { for _, peer := range peers {

View File

@ -24,7 +24,7 @@ func makeAdminService(t *testing.T) *AdminService {
require.NoError(t, err) require.NoError(t, err)
err = n.Start() err = n.Start()
require.NoError(t, err) require.NoError(t, err)
return &AdminService{n, utils.Logger().Sugar()} return &AdminService{n, utils.Logger()}
} }
func TestV1Peers(t *testing.T) { func TestV1Peers(t *testing.T) {
@ -33,7 +33,7 @@ func TestV1Peers(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader) host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger().Sugar()) relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
defer relay.Stop() defer relay.Stop()

View File

@ -14,7 +14,7 @@ import (
type FilterService struct { type FilterService struct {
node *node.WakuNode node *node.WakuNode
log *zap.SugaredLogger log *zap.Logger
messages map[string][]*pb.WakuMessage messages map[string][]*pb.WakuMessage
messagesMutex sync.RWMutex messagesMutex sync.RWMutex
@ -31,7 +31,7 @@ type ContentTopicArgs struct {
ContentTopic string `json:"contentTopic,omitempty"` ContentTopic string `json:"contentTopic,omitempty"`
} }
func NewFilterService(node *node.WakuNode, log *zap.SugaredLogger) *FilterService { func NewFilterService(node *node.WakuNode, log *zap.Logger) *FilterService {
s := &FilterService{ s := &FilterService{
node: node, node: node,
log: log.Named("filter"), log: log.Named("filter"),
@ -80,7 +80,7 @@ func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterConten
filter.WithAutomaticPeerSelection(), filter.WithAutomaticPeerSelection(),
) )
if err != nil { if err != nil {
f.log.Error("Error subscribing to topic:", args.Topic, "err:", err) f.log.Error("subscribing to topic", zap.String("topic", args.Topic), zap.Error(err))
reply.Success = false reply.Success = false
reply.Error = err.Error() reply.Error = err.Error()
return nil return nil
@ -98,7 +98,7 @@ func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterCont
makeContentFilter(args), makeContentFilter(args),
) )
if err != nil { if err != nil {
f.log.Error("Error unsubscribing to topic:", args.Topic, "err:", err) f.log.Error("unsubscribing from topic", zap.String("topic", args.Topic), zap.Error(err))
reply.Success = false reply.Success = false
reply.Error = err.Error() reply.Error = err.Error()
return nil return nil

View File

@ -29,7 +29,7 @@ func makeFilterService(t *testing.T) *FilterService {
_, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic)
require.NoError(t, err) require.NoError(t, err)
return NewFilterService(n, utils.Logger().Sugar()) return NewFilterService(n, utils.Logger())
} }
func TestFilterSubscription(t *testing.T) { func TestFilterSubscription(t *testing.T) {
@ -39,13 +39,13 @@ func TestFilterSubscription(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader) host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger().Sugar()) node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
_, err = node.SubscribeToTopic(context.Background(), testTopic) _, err = node.SubscribeToTopic(context.Background(), testTopic)
require.NoError(t, err) require.NoError(t, err)
_, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger().Sugar()) _, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger())
d := makeFilterService(t) d := makeFilterService(t)
defer d.node.Stop() defer d.node.Stop()

View File

@ -16,7 +16,7 @@ import (
type PrivateService struct { type PrivateService struct {
node *node.WakuNode node *node.WakuNode
log *zap.SugaredLogger log *zap.Logger
symmetricMessages map[string][]*pb.WakuMessage symmetricMessages map[string][]*pb.WakuMessage
symmetricMessagesMutex sync.RWMutex symmetricMessagesMutex sync.RWMutex
@ -56,7 +56,7 @@ type AsymmetricMessagesArgs struct {
PrivateKey string `json:"privateKey"` PrivateKey string `json:"privateKey"`
} }
func NewPrivateService(node *node.WakuNode, log *zap.SugaredLogger) *PrivateService { func NewPrivateService(node *node.WakuNode, log *zap.Logger) *PrivateService {
return &PrivateService{ return &PrivateService{
node: node, node: node,
symmetricMessages: make(map[string][]*pb.WakuMessage), symmetricMessages: make(map[string][]*pb.WakuMessage),

View File

@ -16,7 +16,7 @@ func makePrivateService(t *testing.T) *PrivateService {
err = n.Start() err = n.Start()
require.NoError(t, err) require.NoError(t, err)
return NewPrivateService(n, utils.Logger().Sugar()) return NewPrivateService(n, utils.Logger())
} }
func TestGetV1SymmetricKey(t *testing.T) { func TestGetV1SymmetricKey(t *testing.T) {

View File

@ -14,7 +14,7 @@ import (
type RelayService struct { type RelayService struct {
node *node.WakuNode node *node.WakuNode
log *zap.SugaredLogger log *zap.Logger
messages map[string][]*pb.WakuMessage messages map[string][]*pb.WakuMessage
messagesMutex sync.RWMutex messagesMutex sync.RWMutex
@ -35,7 +35,7 @@ type TopicArgs struct {
Topic string `json:"topic,omitempty"` Topic string `json:"topic,omitempty"`
} }
func NewRelayService(node *node.WakuNode, log *zap.SugaredLogger) *RelayService { func NewRelayService(node *node.WakuNode, log *zap.Logger) *RelayService {
s := &RelayService{ s := &RelayService{
node: node, node: node,
log: log.Named("relay"), log: log.Named("relay"),
@ -73,7 +73,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
_, err = r.node.Relay().PublishToTopic(req.Context(), &args.Message, args.Topic) _, err = r.node.Relay().PublishToTopic(req.Context(), &args.Message, args.Topic)
} }
if err != nil { if err != nil {
r.log.Error("Error publishing message: ", err) r.log.Error("publishing message", zap.Error(err))
reply.Success = false reply.Success = false
reply.Error = err.Error() reply.Error = err.Error()
} else { } else {
@ -93,7 +93,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
_, err = r.node.Relay().SubscribeToTopic(ctx, topic) _, err = r.node.Relay().SubscribeToTopic(ctx, topic)
} }
if err != nil { if err != nil {
r.log.Error("Error subscribing to topic:", topic, "err:", err) r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
reply.Success = false reply.Success = false
reply.Error = err.Error() reply.Error = err.Error()
return nil return nil
@ -109,7 +109,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs,
for _, topic := range args.Topics { for _, topic := range args.Topics {
err := r.node.Relay().Unsubscribe(ctx, topic) err := r.node.Relay().Unsubscribe(ctx, topic)
if err != nil { if err != nil {
r.log.Error("Error unsubscribing from topic", topic, "err:", err) r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err))
reply.Success = false reply.Success = false
reply.Error = err.Error() reply.Error = err.Error()
return nil return nil

View File

@ -19,7 +19,7 @@ func makeRelayService(t *testing.T) *RelayService {
require.NoError(t, err) require.NoError(t, err)
err = n.Start() err = n.Start()
require.NoError(t, err) require.NoError(t, err)
return NewRelayService(n, utils.Logger().Sugar()) return NewRelayService(n, utils.Logger())
} }
func TestPostV1Message(t *testing.T) { func TestPostV1Message(t *testing.T) {

View File

@ -11,7 +11,7 @@ import (
type StoreService struct { type StoreService struct {
node *node.WakuNode node *node.WakuNode
log *zap.SugaredLogger log *zap.Logger
} }
// cursor *pb.Index // cursor *pb.Index
@ -56,7 +56,7 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs,
options..., options...,
) )
if err != nil { if err != nil {
s.log.Error("Error querying messages:", zap.Error(err)) s.log.Error("querying messages", zap.Error(err))
reply.Error = err.Error() reply.Error = err.Error()
return nil return nil
} }

View File

@ -15,7 +15,7 @@ func makeStoreService(t *testing.T) *StoreService {
require.NoError(t, err) require.NoError(t, err)
err = n.Start() err = n.Start()
require.NoError(t, err) require.NoError(t, err)
return &StoreService{n, utils.Logger().Sugar()} return &StoreService{n, utils.Logger()}
} }
func TestStoreGetV1Messages(t *testing.T) { func TestStoreGetV1Messages(t *testing.T) {

View File

@ -15,7 +15,7 @@ type WakuRpc struct {
node *node.WakuNode node *node.WakuNode
server *http.Server server *http.Server
log *zap.SugaredLogger log *zap.Logger
relayService *RelayService relayService *RelayService
filterService *FilterService filterService *FilterService
@ -23,7 +23,7 @@ type WakuRpc struct {
adminService *AdminService adminService *AdminService
} }
func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, enablePrivate bool, log *zap.SugaredLogger) *WakuRpc { func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, enablePrivate bool, log *zap.Logger) *WakuRpc {
wrpc := new(WakuRpc) wrpc := new(WakuRpc)
wrpc.log = log.Named("rpc") wrpc.log = log.Named("rpc")
@ -33,25 +33,25 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool,
err := s.RegisterService(&DebugService{node}, "Debug") err := s.RegisterService(&DebugService{node}, "Debug")
if err != nil { if err != nil {
wrpc.log.Error(err) wrpc.log.Error("registering debug service", zap.Error(err))
} }
relayService := NewRelayService(node, log) relayService := NewRelayService(node, log)
err = s.RegisterService(relayService, "Relay") err = s.RegisterService(relayService, "Relay")
if err != nil { if err != nil {
wrpc.log.Error(err) wrpc.log.Error("registering relay service", zap.Error(err))
} }
err = s.RegisterService(&StoreService{node, log}, "Store") err = s.RegisterService(&StoreService{node, log}, "Store")
if err != nil { if err != nil {
wrpc.log.Error(err) wrpc.log.Error("registering store service", zap.Error(err))
} }
if enableAdmin { if enableAdmin {
adminService := &AdminService{node, log.Named("admin")} adminService := &AdminService{node, log.Named("admin")}
err = s.RegisterService(adminService, "Admin") err = s.RegisterService(adminService, "Admin")
if err != nil { if err != nil {
wrpc.log.Error(err) wrpc.log.Error("registering admin service", zap.Error(err))
} }
wrpc.adminService = adminService wrpc.adminService = adminService
} }
@ -59,14 +59,14 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool,
filterService := NewFilterService(node, log) filterService := NewFilterService(node, log)
err = s.RegisterService(filterService, "Filter") err = s.RegisterService(filterService, "Filter")
if err != nil { if err != nil {
wrpc.log.Error(err) wrpc.log.Error("registering filter service", zap.Error(err))
} }
if enablePrivate { if enablePrivate {
privateService := NewPrivateService(node, log) privateService := NewPrivateService(node, log)
err = s.RegisterService(privateService, "Private") err = s.RegisterService(privateService, "Private")
if err != nil { if err != nil {
wrpc.log.Error(err) wrpc.log.Error("registering private service", zap.Error(err))
} }
wrpc.privateService = privateService wrpc.privateService = privateService
} }
@ -75,7 +75,7 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool,
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
t := time.Now() t := time.Now()
s.ServeHTTP(w, r) s.ServeHTTP(w, r)
wrpc.log.Infof("RPC request at %s took %s", r.URL.Path, time.Since(t)) wrpc.log.Info("served request", zap.String("path", r.URL.Path), zap.Duration("duration", time.Since(t)))
}) })
listenAddr := fmt.Sprintf("%s:%d", address, port) listenAddr := fmt.Sprintf("%s:%d", address, port)
@ -104,10 +104,10 @@ func (r *WakuRpc) Start() {
go func() { go func() {
_ = r.server.ListenAndServe() _ = r.server.ListenAndServe()
}() }()
r.log.Info("Rpc server started at ", r.server.Addr) r.log.Info("server started", zap.String("addr", r.server.Addr))
} }
func (r *WakuRpc) Stop(ctx context.Context) error { func (r *WakuRpc) Stop(ctx context.Context) error {
r.log.Info("Shutting down rpc server") r.log.Info("shutting down server")
return r.server.Shutdown(ctx) return r.server.Shutdown(ctx)
} }

View File

@ -14,7 +14,7 @@ func TestWakuRpc(t *testing.T) {
n, err := node.New(context.Background(), options) n, err := node.New(context.Background(), options)
require.NoError(t, err) require.NoError(t, err)
rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger().Sugar()) rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger())
require.NotNil(t, rpc.server) require.NotNil(t, rpc.server)
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
} }