mirror of https://github.com/status-im/go-waku.git
feat: structured logging (#242)
This commit is contained in:
parent
d872b28d8f
commit
0c989d3d8c
|
@ -0,0 +1,27 @@
|
|||
# Logging Style Guide
|
||||
|
||||
The goal of the style described here is to yield logs that are amenable to searching and aggregating. Structured logging is the best foundation for that. The log entries should be consistent and predictable to support search efficiency and high fidelity of search results. This style puts forward guidelines that promote this outcome.
|
||||
|
||||
## Log messages
|
||||
|
||||
* Messages should be fixed strings, never interpolate values into the messages. Use log entry fields for values.
|
||||
|
||||
* Message strings should be consistent identification of what action/event was/is happening. Consistent messages makes searching the logs and aggregating correlated events easier.
|
||||
|
||||
* Error messages should look like any other log messages. No need to say "x failed", the log level and error field are sufficient indication of failure.
|
||||
|
||||
## Log entry fields
|
||||
|
||||
* Adding fields to log entries is not free, but the fields are the discriminators that allow distinguishing similar log entries from each other. Insufficient field structure will makes it more difficult to find the entries you are looking for.
|
||||
|
||||
* Create/Use field helpers for commonly used field value types (see logging.go). It promotes consistent formatting and allows changing it easily in one place.
|
||||
|
||||
# Log entry field helpers
|
||||
|
||||
* Make the field creation do as little as possible, i.e. just capture the existing value/object. Postpone any transformation to log emission time by employing generic zap.Stringer, zap.Array, zap.Object fields (see logging.go). It avoids unnecessary transformation for entries that may not even be emitted in the end.
|
||||
|
||||
## Logger management
|
||||
|
||||
* Adorn the logger with fields and reuse the adorned logger rather than repeatedly creating fields with each log entry.
|
||||
|
||||
* Prefer passing the adorned logger down the call chain using Context. It promotes consistent log entry structure, i.e. fields will exist consistently in related entries.
|
|
@ -0,0 +1,20 @@
|
|||
package logging
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var logKey = &struct{}{}
|
||||
|
||||
// From allows retrieving the Logger from a Context
|
||||
func From(ctx context.Context) *zap.Logger {
|
||||
return ctx.Value(logKey).(*zap.Logger)
|
||||
}
|
||||
|
||||
// With associates a Logger with a Context to allow passing
|
||||
// a logger down the call chain.
|
||||
func With(ctx context.Context, log *zap.Logger) context.Context {
|
||||
return context.WithValue(ctx, logKey, log)
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
// logging implements custom logging field types for commonly
|
||||
// logged values like host ID or wallet address.
|
||||
//
|
||||
// implementation purposely does as little as possible at field creation time,
|
||||
// and postpones any transformation to output time by relying on the generic
|
||||
// zap types like zap.Stringer, zap.Array, zap.Object
|
||||
//
|
||||
package logging
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// List of multiaddrs
|
||||
type multiaddrs []multiaddr.Multiaddr
|
||||
|
||||
// MultiAddrs creates a field with an array of multiaddrs
|
||||
func MultiAddrs(key string, addrs ...multiaddr.Multiaddr) zapcore.Field {
|
||||
return zap.Array(key, multiaddrs(addrs))
|
||||
}
|
||||
|
||||
func (addrs multiaddrs) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
|
||||
for _, addr := range addrs {
|
||||
encoder.AppendString(addr.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Host ID/Peer ID
|
||||
type hostID peer.ID
|
||||
|
||||
// HostID creates a field for a peer.ID
|
||||
func HostID(key string, id peer.ID) zapcore.Field {
|
||||
return zap.Stringer(key, hostID(id))
|
||||
}
|
||||
|
||||
func (id hostID) String() string { return peer.Encode(peer.ID(id)) }
|
||||
|
||||
// Time - Waku uses Microsecond Unix Time
|
||||
type timestamp int64
|
||||
|
||||
// Time creates a field for Waku time value
|
||||
func Time(key string, time int64) zapcore.Field {
|
||||
return zap.Stringer(key, timestamp(time))
|
||||
}
|
||||
|
||||
func (t timestamp) String() string {
|
||||
return time.UnixMicro(int64(t)).Format(time.RFC3339)
|
||||
}
|
||||
|
||||
// History Query Filters
|
||||
type historyFilters []*pb.ContentFilter
|
||||
|
||||
// Filters creates a field with an array of history query filters.
|
||||
// The assumption is that log entries won't have more than one of these,
|
||||
// so the field key/name is hardcoded to be "filters" to promote consistency.
|
||||
func Filters(filters []*pb.ContentFilter) zapcore.Field {
|
||||
return zap.Array("filters", historyFilters(filters))
|
||||
}
|
||||
|
||||
func (filters historyFilters) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
|
||||
for _, filter := range filters {
|
||||
encoder.AppendString(filter.ContentTopic)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// History Paging Info
|
||||
// Probably too detailed for normal log levels, but useful for debugging.
|
||||
// Also a good example of nested object value.
|
||||
type pagingInfo pb.PagingInfo
|
||||
type index pb.Index
|
||||
|
||||
// PagingInfo creates a field with history query paging info.
|
||||
func PagingInfo(pi *pb.PagingInfo) zapcore.Field {
|
||||
return zap.Object("paging_info", (*pagingInfo)(pi))
|
||||
}
|
||||
|
||||
func (pi *pagingInfo) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
|
||||
encoder.AddUint64("page_size", pi.PageSize)
|
||||
encoder.AddString("direction", pi.Direction.String())
|
||||
if pi.Cursor != nil {
|
||||
return encoder.AddObject("cursor", (*index)(pi.Cursor))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *index) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
|
||||
encoder.AddBinary("digest", i.Digest)
|
||||
encoder.AddTime("sent", time.UnixMicro(i.SenderTime))
|
||||
encoder.AddTime("received", time.UnixMicro(i.ReceiverTime))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Hex encoded bytes
|
||||
type hexBytes []byte
|
||||
|
||||
// HexBytes creates a field for a byte slice that should be emitted as hex encoded string.
|
||||
func HexBytes(key string, bytes []byte) zap.Field {
|
||||
return zap.Stringer(key, hexBytes(bytes))
|
||||
}
|
||||
|
||||
func (bytes hexBytes) String() string {
|
||||
return hexutil.Encode(bytes)
|
||||
}
|
|
@ -10,7 +10,7 @@ import (
|
|||
)
|
||||
|
||||
func TestStartAndStopMetricsServer(t *testing.T) {
|
||||
server := NewMetricsServer("0.0.0.0", 9876, utils.Logger())
|
||||
server := NewMetricsServer("0.0.0.0", 9876, utils.Logger().Sugar())
|
||||
|
||||
go func() {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
|
38
waku/node.go
38
waku/node.go
|
@ -23,12 +23,14 @@ import (
|
|||
"github.com/libp2p/go-libp2p"
|
||||
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/discovery"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p/config"
|
||||
|
||||
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
rendezvous "github.com/status-im/go-waku-rendezvous"
|
||||
"github.com/status-im/go-waku/logging"
|
||||
"github.com/status-im/go-waku/waku/metrics"
|
||||
"github.com/status-im/go-waku/waku/persistence"
|
||||
"github.com/status-im/go-waku/waku/persistence/sqlite"
|
||||
|
@ -44,9 +46,6 @@ import (
|
|||
|
||||
func failOnErr(err error, msg string) {
|
||||
if err != nil {
|
||||
if msg != "" {
|
||||
msg = msg + ": "
|
||||
}
|
||||
utils.Logger().Fatal(msg, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
@ -86,6 +85,11 @@ func Execute(options Options) {
|
|||
prvKey, err := getPrivKey(options)
|
||||
failOnErr(err, "nodekey error")
|
||||
|
||||
p2pPrvKey := libp2pcrypto.Secp256k1PrivateKey(*prvKey)
|
||||
id, err := peer.IDFromPublicKey((&p2pPrvKey).GetPublic())
|
||||
failOnErr(err, "deriving peer ID from private key")
|
||||
logger := utils.Logger().With(logging.HostID("node", id))
|
||||
|
||||
if options.DBPath == "" && options.UseDB {
|
||||
failOnErr(errors.New("dbpath can't be null"), "")
|
||||
}
|
||||
|
@ -101,12 +105,12 @@ func Execute(options Options) {
|
|||
|
||||
var metricsServer *metrics.Server
|
||||
if options.Metrics.Enable {
|
||||
metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port, utils.Logger())
|
||||
metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port, logger.Sugar())
|
||||
go metricsServer.Start()
|
||||
}
|
||||
|
||||
nodeOpts := []node.WakuNodeOption{
|
||||
node.WithLogger(utils.Logger().Desugar()),
|
||||
node.WithLogger(logger),
|
||||
node.WithPrivateKey(prvKey),
|
||||
node.WithHostAddress(hostAddr),
|
||||
node.WithKeepAlive(time.Duration(options.KeepAlive) * time.Second),
|
||||
|
@ -186,7 +190,7 @@ func Execute(options Options) {
|
|||
if options.Store.Enable {
|
||||
nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages))
|
||||
if options.UseDB {
|
||||
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration()))
|
||||
dbStore, err := persistence.NewDBStore(logger.Sugar(), persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration()))
|
||||
failOnErr(err, "DBStore")
|
||||
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
|
||||
} else {
|
||||
|
@ -205,16 +209,16 @@ func Execute(options Options) {
|
|||
var discoveredNodes []dnsdisc.DiscoveredNode
|
||||
if options.DNSDiscovery.Enable {
|
||||
if options.DNSDiscovery.URL != "" {
|
||||
utils.Logger().Info("attempting DNS discovery with ", zap.String("URL", options.DNSDiscovery.URL))
|
||||
logger.Info("attempting DNS discovery with ", zap.String("URL", options.DNSDiscovery.URL))
|
||||
nodes, err := dnsdisc.RetrieveNodes(ctx, options.DNSDiscovery.URL, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver))
|
||||
if err != nil {
|
||||
utils.Logger().Warn("dns discovery error ", zap.Error(err))
|
||||
logger.Warn("dns discovery error ", zap.Error(err))
|
||||
} else {
|
||||
utils.Logger().Info("found dns entries ", zap.Any("qty", len(nodes)))
|
||||
logger.Info("found dns entries ", zap.Any("qty", len(nodes)))
|
||||
discoveredNodes = nodes
|
||||
}
|
||||
} else {
|
||||
utils.Logger().Fatal("DNS discovery URL is required")
|
||||
logger.Fatal("DNS discovery URL is required")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -223,7 +227,7 @@ func Execute(options Options) {
|
|||
for _, addr := range options.DiscV5.Nodes.Value() {
|
||||
bootnode, err := enode.Parse(enode.ValidSchemes, addr)
|
||||
if err != nil {
|
||||
utils.Logger().Fatal("could not parse enr: ", zap.Error(err))
|
||||
logger.Fatal("parsing ENR", zap.Error(err))
|
||||
}
|
||||
bootnodes = append(bootnodes, bootnode)
|
||||
}
|
||||
|
@ -247,12 +251,12 @@ func Execute(options Options) {
|
|||
addPeers(wakuNode, options.Filter.Nodes.Value(), string(filter.FilterID_v20beta1))
|
||||
|
||||
if err = wakuNode.Start(); err != nil {
|
||||
utils.Logger().Fatal(fmt.Errorf("could not start waku node, %w", err).Error())
|
||||
logger.Fatal("starting waku node", zap.Error(err))
|
||||
}
|
||||
|
||||
if options.DiscV5.Enable {
|
||||
if err = wakuNode.DiscV5().Start(); err != nil {
|
||||
utils.Logger().Fatal(fmt.Errorf("could not start discovery v5, %w", err).Error())
|
||||
logger.Fatal("starting discovery v5", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -273,7 +277,7 @@ func Execute(options Options) {
|
|||
go func(node string) {
|
||||
err = wakuNode.DialPeer(ctx, node)
|
||||
if err != nil {
|
||||
utils.Logger().Error("error dialing peer ", zap.Error(err))
|
||||
logger.Error("dialing peer", zap.Error(err))
|
||||
}
|
||||
}(n)
|
||||
}
|
||||
|
@ -286,7 +290,7 @@ func Execute(options Options) {
|
|||
defer cancel()
|
||||
err = wakuNode.DialPeerWithMultiAddress(ctx, m)
|
||||
if err != nil {
|
||||
utils.Logger().Error("error dialing peer ", zap.Error(err))
|
||||
logger.Error("dialing peer ", zap.Error(err))
|
||||
}
|
||||
}(ctx, m)
|
||||
}
|
||||
|
@ -295,7 +299,7 @@ func Execute(options Options) {
|
|||
|
||||
var rpcServer *rpc.WakuRpc
|
||||
if options.RPCServer.Enable {
|
||||
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, utils.Logger())
|
||||
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, logger.Sugar())
|
||||
rpcServer.Start()
|
||||
}
|
||||
|
||||
|
@ -305,7 +309,7 @@ func Execute(options Options) {
|
|||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-ch
|
||||
utils.Logger().Info("Received signal, shutting down...")
|
||||
logger.Info("Received signal, shutting down...")
|
||||
|
||||
// shut the node down
|
||||
wakuNode.Stop()
|
||||
|
|
|
@ -10,12 +10,13 @@ import (
|
|||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/status-im/go-waku/waku/v2/utils"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func NewMock() *sql.DB {
|
||||
db, err := sql.Open("sqlite3", ":memory:")
|
||||
if err != nil {
|
||||
utils.Logger().Fatalf("an error '%s' was not expected when opening a stub database connection", err)
|
||||
utils.Logger().Fatal("opening a stub database connection", zap.Error(err))
|
||||
}
|
||||
|
||||
return db
|
||||
|
@ -32,7 +33,7 @@ func createIndex(digest []byte, receiverTime int64) *pb.Index {
|
|||
func TestDbStore(t *testing.T) {
|
||||
db := NewMock()
|
||||
option := WithDB(db)
|
||||
store, err := NewDBStore(utils.Logger(), option)
|
||||
store, err := NewDBStore(utils.Logger().Sugar(), option)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := store.GetAll()
|
||||
|
@ -53,7 +54,7 @@ func TestDbStore(t *testing.T) {
|
|||
|
||||
func TestStoreRetention(t *testing.T) {
|
||||
db := NewMock()
|
||||
store, err := NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 20*time.Second))
|
||||
store, err := NewDBStore(utils.Logger().Sugar(), WithDB(db), WithRetentionPolicy(5, 20*time.Second))
|
||||
require.NoError(t, err)
|
||||
|
||||
insertTime := time.Now()
|
||||
|
@ -73,7 +74,7 @@ func TestStoreRetention(t *testing.T) {
|
|||
|
||||
// This step simulates starting go-waku again from scratch
|
||||
|
||||
store, err = NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second))
|
||||
store, err = NewDBStore(utils.Logger().Sugar(), WithDB(db), WithRetentionPolicy(5, 40*time.Second))
|
||||
require.NoError(t, err)
|
||||
|
||||
dbResults, err = store.GetAll()
|
||||
|
|
|
@ -47,19 +47,19 @@ func TestDiscV5(t *testing.T) {
|
|||
host1, _, prvKey1 := createHost(t)
|
||||
udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3)
|
||||
require.NoError(t, err)
|
||||
d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort1))
|
||||
d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort1))
|
||||
require.NoError(t, err)
|
||||
|
||||
host2, _, prvKey2 := createHost(t)
|
||||
udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3)
|
||||
require.NoError(t, err)
|
||||
d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
|
||||
d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
|
||||
require.NoError(t, err)
|
||||
|
||||
host3, _, prvKey3 := createHost(t)
|
||||
udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3)
|
||||
require.NoError(t, err)
|
||||
d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
|
||||
d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
|
||||
require.NoError(t, err)
|
||||
|
||||
defer d1.Stop()
|
||||
|
|
|
@ -2,12 +2,12 @@ package node
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/status-im/go-waku/logging"
|
||||
"github.com/status-im/go-waku/waku/v2/metrics"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/filter"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
|
||||
|
@ -33,12 +33,12 @@ type ConnStatus struct {
|
|||
type ConnectionNotifier struct {
|
||||
h host.Host
|
||||
ctx context.Context
|
||||
log *zap.SugaredLogger
|
||||
log *zap.Logger
|
||||
DisconnectChan chan peer.ID
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.SugaredLogger) ConnectionNotifier {
|
||||
func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.Logger) ConnectionNotifier {
|
||||
return ConnectionNotifier{
|
||||
h: h,
|
||||
ctx: ctx,
|
||||
|
@ -58,13 +58,13 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m ma.Multiaddr) {
|
|||
|
||||
// Connected is called when a connection is opened
|
||||
func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
|
||||
c.log.Info(fmt.Sprintf("Peer %s connected", cc.RemotePeer()))
|
||||
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()))
|
||||
stats.Record(c.ctx, metrics.Peers.M(1))
|
||||
}
|
||||
|
||||
// Disconnected is called when a connection closed
|
||||
func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) {
|
||||
c.log.Info(fmt.Sprintf("Peer %s disconnected", cc.RemotePeer()))
|
||||
c.log.Info("peer disconnected", logging.HostID("peer", cc.RemotePeer()))
|
||||
stats.Record(c.ctx, metrics.Peers.M(-1))
|
||||
c.DisconnectChan <- cc.RemotePeer()
|
||||
}
|
||||
|
@ -117,7 +117,7 @@ func (w *WakuNode) Status() (isOnline bool, hasHistory bool) {
|
|||
for _, peer := range w.host.Network().Peers() {
|
||||
protocols, err := w.host.Peerstore().GetProtocols(peer)
|
||||
if err != nil {
|
||||
w.log.Warn(fmt.Errorf("could not read peer %s protocols", peer))
|
||||
w.log.Warn("reading peer protocols", logging.HostID("peer", peer), zap.Error(err))
|
||||
}
|
||||
|
||||
for _, protocol := range protocols {
|
||||
|
|
|
@ -2,12 +2,13 @@ package node
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/status-im/go-waku/logging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const maxAllowedPingFailures = 2
|
||||
|
@ -19,7 +20,7 @@ const maxPublishAttempt = 5
|
|||
func (w *WakuNode) startKeepAlive(t time.Duration) {
|
||||
go func() {
|
||||
defer w.wg.Done()
|
||||
w.log.Info("Setting up ping protocol with duration of ", t)
|
||||
w.log.Info("setting up ping protocol", zap.Duration("duration", t))
|
||||
ticker := time.NewTicker(t)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
@ -52,25 +53,26 @@ func (w *WakuNode) pingPeer(peer peer.ID) {
|
|||
ctx, cancel := context.WithTimeout(w.ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
w.log.Debug("Pinging ", peer)
|
||||
logger := w.log.With(logging.HostID("peer", peer))
|
||||
logger.Debug("pinging")
|
||||
pr := ping.Ping(ctx, w.host, peer)
|
||||
select {
|
||||
case res := <-pr:
|
||||
if res.Error != nil {
|
||||
w.keepAliveFails[peer]++
|
||||
w.log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error()))
|
||||
logger.Debug("could not ping", zap.Error(res.Error))
|
||||
} else {
|
||||
w.keepAliveFails[peer] = 0
|
||||
}
|
||||
case <-ctx.Done():
|
||||
w.keepAliveFails[peer]++
|
||||
w.log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err()))
|
||||
logger.Debug("could not ping (context done)", zap.Error(ctx.Err()))
|
||||
}
|
||||
|
||||
if w.keepAliveFails[peer] > maxAllowedPingFailures && w.host.Network().Connectedness(peer) == network.Connected {
|
||||
w.log.Info("Disconnecting peer ", peer)
|
||||
logger.Info("disconnecting peer")
|
||||
if err := w.host.Network().ClosePeer(peer); err != nil {
|
||||
w.log.Debug(fmt.Sprintf("Could not close conn to peer %s: %s", peer, err))
|
||||
logger.Debug("closing conn to peer", zap.Error(err))
|
||||
}
|
||||
w.keepAliveFails[peer] = 0
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -24,6 +23,7 @@ import (
|
|||
"go.opencensus.io/stats"
|
||||
|
||||
rendezvous "github.com/status-im/go-waku-rendezvous"
|
||||
"github.com/status-im/go-waku/logging"
|
||||
"github.com/status-im/go-waku/waku/try"
|
||||
v2 "github.com/status-im/go-waku/waku/v2"
|
||||
"github.com/status-im/go-waku/waku/v2/discv5"
|
||||
|
@ -49,7 +49,7 @@ type storeFactory func(w *WakuNode) store.Store
|
|||
type WakuNode struct {
|
||||
host host.Host
|
||||
opts *WakuNodeParameters
|
||||
log *zap.SugaredLogger
|
||||
log *zap.Logger
|
||||
|
||||
relay *relay.WakuRelay
|
||||
filter *filter.WakuFilter
|
||||
|
@ -86,7 +86,7 @@ type WakuNode struct {
|
|||
}
|
||||
|
||||
func defaultStoreFactory(w *WakuNode) store.Store {
|
||||
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
|
||||
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log.Sugar())
|
||||
}
|
||||
|
||||
// New is used to instantiate a WakuNode using a set of WakuNodeOptions
|
||||
|
@ -193,19 +193,19 @@ func (w *WakuNode) onAddrChange() {
|
|||
for m := range w.addrChan {
|
||||
ipStr, err := m.ValueForProtocol(ma.P_IP4)
|
||||
if err != nil {
|
||||
w.log.Error(fmt.Sprintf("could not extract ip from ma %s: %s", m, err.Error()))
|
||||
w.log.Error("extracting ip from ma", logging.MultiAddrs("ma", m), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
portStr, err := m.ValueForProtocol(ma.P_TCP)
|
||||
if err != nil {
|
||||
w.log.Error(fmt.Sprintf("could not extract port from ma %s: %s", m, err.Error()))
|
||||
w.log.Error("extracting port from ma", logging.MultiAddrs("ma", m), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
port, err := strconv.Atoi(portStr)
|
||||
if err != nil {
|
||||
w.log.Error(fmt.Sprintf("could not convert port to int: %s", err.Error()))
|
||||
w.log.Error("converting port to int", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -218,7 +218,7 @@ func (w *WakuNode) onAddrChange() {
|
|||
if w.opts.enableDiscV5 {
|
||||
err := w.discoveryV5.UpdateAddr(addr)
|
||||
if err != nil {
|
||||
w.log.Error(fmt.Sprintf("could not update DiscV5 address with IP %s:%d %s", addr.IP, addr.Port, err.Error()))
|
||||
w.log.Error("updating DiscV5 address with IP", zap.Stringer("ip", addr.IP), zap.Int("port", addr.Port), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -227,15 +227,15 @@ func (w *WakuNode) onAddrChange() {
|
|||
}
|
||||
|
||||
func (w *WakuNode) logAddress(addr ma.Multiaddr) {
|
||||
w.log.Info("Listening on ", addr)
|
||||
logger := w.log.With(logging.MultiAddrs("multiaddr", addr))
|
||||
|
||||
// TODO: make this optional depending on DNS Disc being enabled
|
||||
if w.opts.privKey != nil {
|
||||
enr, ip, err := utils.GetENRandIP(addr, w.wakuFlag, w.opts.privKey)
|
||||
if err != nil {
|
||||
w.log.Error("could not obtain ENR record from multiaddress", err)
|
||||
logger.Error("obtaining ENR record from multiaddress", zap.Error(err))
|
||||
} else {
|
||||
w.log.Info(fmt.Sprintf("DNS: discoverable ENR for IP %s: %s", ip, enr))
|
||||
logger.Info("listening", zap.Stringer("ENR", enr), zap.Stringer("ip", ip))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -281,9 +281,9 @@ func (w *WakuNode) checkForAddressChanges() {
|
|||
|
||||
// Start initializes all the protocols that were setup in the WakuNode
|
||||
func (w *WakuNode) Start() error {
|
||||
w.log.Info("Version details ", "commit=", GitCommit)
|
||||
w.log.Info("Version details ", zap.String("commit", GitCommit))
|
||||
|
||||
w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{
|
||||
w.swap = swap.NewWakuSwap(w.log.Sugar(), []swap.SwapOption{
|
||||
swap.WithMode(w.opts.swapMode),
|
||||
swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold),
|
||||
}...)
|
||||
|
@ -294,7 +294,7 @@ func (w *WakuNode) Start() error {
|
|||
}
|
||||
|
||||
if w.opts.enableFilter {
|
||||
filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log, w.opts.filterOpts...)
|
||||
filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log.Sugar(), w.opts.filterOpts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -322,7 +322,7 @@ func (w *WakuNode) Start() error {
|
|||
return err
|
||||
}
|
||||
|
||||
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log)
|
||||
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log.Sugar())
|
||||
if w.opts.enableLightPush {
|
||||
if err := w.lightPush.Start(); err != nil {
|
||||
return err
|
||||
|
@ -446,11 +446,11 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
|
|||
if !w.lightPush.IsStarted() {
|
||||
err = errors.New("not enought peers for relay and lightpush is not yet started")
|
||||
} else {
|
||||
w.log.Debug("publishing message via lightpush", hexutil.Encode(hash))
|
||||
w.log.Debug("publishing message via lightpush", logging.HexBytes("hash", hash))
|
||||
_, err = w.Lightpush().Publish(ctx, msg)
|
||||
}
|
||||
} else {
|
||||
w.log.Debug("publishing message via relay", hexutil.Encode(hash))
|
||||
w.log.Debug("publishing message via relay", logging.HexBytes("hash", hash))
|
||||
_, err = w.Relay().Publish(ctx, msg)
|
||||
}
|
||||
|
||||
|
@ -462,7 +462,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
|
|||
|
||||
func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error {
|
||||
var err error
|
||||
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log, opts...)
|
||||
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log.Sugar(), opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -492,7 +492,7 @@ func (w *WakuNode) mountDiscV5() error {
|
|||
}
|
||||
|
||||
var err error
|
||||
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log, discV5Options...)
|
||||
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log.Sugar(), discV5Options...)
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -549,7 +549,7 @@ func (w *WakuNode) startStore() {
|
|||
}
|
||||
|
||||
func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...string) error {
|
||||
w.log.Info(fmt.Sprintf("Adding peer %s to peerstore", info.ID.Pretty()))
|
||||
w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID))
|
||||
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
|
||||
err := w.host.Peerstore().AddProtocols(info.ID, protocols...)
|
||||
if err != nil {
|
||||
|
|
|
@ -47,7 +47,7 @@ type WakuNodeParameters struct {
|
|||
wssPort int
|
||||
tlsConfig *tls.Config
|
||||
|
||||
logger *zap.SugaredLogger
|
||||
logger *zap.Logger
|
||||
|
||||
enableRelay bool
|
||||
enableFilter bool
|
||||
|
@ -92,7 +92,7 @@ type WakuNodeOption func(*WakuNodeParameters) error
|
|||
|
||||
// Default options used in the libp2p node
|
||||
var DefaultWakuNodeOptions = []WakuNodeOption{
|
||||
WithLogger(utils.Logger().Desugar()),
|
||||
WithLogger(utils.Logger()),
|
||||
WithWakuRelay(),
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ func (w WakuNodeParameters) AddressFactory() basichost.AddrsFactory {
|
|||
// WithLogger is a WakuNodeOption that adds a custom logger
|
||||
func WithLogger(l *zap.Logger) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.logger = l.Sugar()
|
||||
params.logger = l
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ func TestWakuOptions(t *testing.T) {
|
|||
advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
|
||||
|
||||
storeFactory := func(w *WakuNode) store.Store {
|
||||
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
|
||||
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log.Sugar())
|
||||
}
|
||||
|
||||
options := []WakuNodeOption{
|
||||
|
|
|
@ -40,7 +40,7 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
|
|||
|
||||
func WithAutomaticPeerSelection() FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log)
|
||||
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log.Desugar())
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
|
@ -51,7 +51,7 @@ func WithAutomaticPeerSelection() FilterSubscribeOption {
|
|||
|
||||
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log)
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log.Desugar())
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
|
|
|
@ -25,7 +25,7 @@ func TestFilterOption(t *testing.T) {
|
|||
|
||||
params := new(FilterSubscribeParameters)
|
||||
params.host = host
|
||||
params.log = utils.Logger()
|
||||
params.log = utils.Logger().Sugar()
|
||||
|
||||
for _, opt := range options {
|
||||
opt(params)
|
||||
|
|
|
@ -23,7 +23,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel
|
|||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger())
|
||||
relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger().Sugar())
|
||||
require.NoError(t, err)
|
||||
|
||||
sub, err := relay.SubscribeToTopic(context.Background(), topic)
|
||||
|
@ -39,7 +39,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
|
|||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger())
|
||||
filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger().Sugar())
|
||||
|
||||
return filter, host
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ func TestWakuFilter(t *testing.T) {
|
|||
defer node2.Stop()
|
||||
defer sub2.Unsubscribe()
|
||||
|
||||
node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger())
|
||||
node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger().Sugar())
|
||||
broadcaster.Register(&testTopic, node2Filter.MsgC)
|
||||
|
||||
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
|
||||
|
@ -154,7 +154,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
|
|||
defer node2.Stop()
|
||||
defer sub2.Unsubscribe()
|
||||
|
||||
node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger(), WithTimeout(3*time.Second))
|
||||
node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger().Sugar(), WithTimeout(3*time.Second))
|
||||
broadcaster.Register(&testTopic, node2Filter.MsgC)
|
||||
|
||||
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
|
||||
|
|
|
@ -27,7 +27,7 @@ func WithPeer(p peer.ID) LightPushOption {
|
|||
|
||||
func WithAutomaticPeerSelection(host host.Host) LightPushOption {
|
||||
return func(params *LightPushParameters) {
|
||||
p, err := utils.SelectPeer(host, string(LightPushID_v20beta1), params.log)
|
||||
p, err := utils.SelectPeer(host, string(LightPushID_v20beta1), params.log.Desugar())
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
|
@ -38,7 +38,7 @@ func WithAutomaticPeerSelection(host host.Host) LightPushOption {
|
|||
|
||||
func WithFastestPeerSelection(ctx context.Context) LightPushOption {
|
||||
return func(params *LightPushParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log)
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log.Desugar())
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestLightPushOption(t *testing.T) {
|
|||
|
||||
params := new(LightPushParameters)
|
||||
params.host = host
|
||||
params.log = utils.Logger()
|
||||
params.log = utils.Logger().Sugar()
|
||||
|
||||
for _, opt := range options {
|
||||
opt(params)
|
||||
|
|
|
@ -25,7 +25,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri
|
|||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger())
|
||||
relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger().Sugar())
|
||||
require.NoError(t, err)
|
||||
|
||||
sub, err := relay.SubscribeToTopic(context.Background(), topic)
|
||||
|
@ -56,7 +56,7 @@ func TestWakuLightPush(t *testing.T) {
|
|||
defer sub2.Unsubscribe()
|
||||
|
||||
ctx := context.Background()
|
||||
lightPushNode2 := NewWakuLightPush(ctx, host2, node2, utils.Logger())
|
||||
lightPushNode2 := NewWakuLightPush(ctx, host2, node2, utils.Logger().Sugar())
|
||||
err := lightPushNode2.Start()
|
||||
require.NoError(t, err)
|
||||
defer lightPushNode2.Stop()
|
||||
|
@ -66,7 +66,7 @@ func TestWakuLightPush(t *testing.T) {
|
|||
|
||||
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger())
|
||||
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar())
|
||||
|
||||
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
||||
err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200))
|
||||
|
@ -122,7 +122,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) {
|
|||
|
||||
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger())
|
||||
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar())
|
||||
err = client.Start()
|
||||
|
||||
require.Errorf(t, err, "relay is required")
|
||||
|
@ -136,7 +136,7 @@ func TestWakuLightPushNoPeers(t *testing.T) {
|
|||
|
||||
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger())
|
||||
client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar())
|
||||
|
||||
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic)
|
||||
require.Errorf(t, err, "no suitable remote peers")
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestWakuRelay(t *testing.T) {
|
|||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger())
|
||||
relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger().Sugar())
|
||||
defer relay.Stop()
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) {
|
|||
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
|
||||
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
_ = s.storeMessage(msg1)
|
||||
_ = s.storeMessage(msg3)
|
||||
_ = s.storeMessage(msg5)
|
||||
|
@ -38,7 +38,7 @@ func TestResume(t *testing.T) {
|
|||
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
|
@ -56,7 +56,7 @@ func TestResume(t *testing.T) {
|
|||
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
|
@ -88,7 +88,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
|||
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
|
@ -99,7 +99,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
|||
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
|
@ -121,7 +121,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
|||
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
|
@ -132,7 +132,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
|||
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
|
|
|
@ -491,7 +491,7 @@ func WithPeer(p peer.ID) HistoryRequestOption {
|
|||
// to request the message history
|
||||
func WithAutomaticPeerSelection() HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log)
|
||||
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log.Desugar())
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
|
@ -502,7 +502,7 @@ func WithAutomaticPeerSelection() HistoryRequestOption {
|
|||
|
||||
func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log)
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log.Desugar())
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
|
@ -774,7 +774,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
|
|||
}
|
||||
|
||||
if len(peerList) == 0 {
|
||||
p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log)
|
||||
p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log.Desugar())
|
||||
if err != nil {
|
||||
store.log.Info("Error selecting peer: ", err)
|
||||
return -1, ErrNoPeersAvailable
|
||||
|
|
|
@ -21,10 +21,10 @@ func TestStorePersistence(t *testing.T) {
|
|||
db, err := sqlite.NewDB(":memory:")
|
||||
require.NoError(t, err)
|
||||
|
||||
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db))
|
||||
dbStore, err := persistence.NewDBStore(utils.Logger().Sugar(), persistence.WithDB(db))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger())
|
||||
s1 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger().Sugar())
|
||||
s1.fetchDBRecords(ctx)
|
||||
require.Len(t, s1.messageQueue.messages, 0)
|
||||
|
||||
|
@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) {
|
|||
|
||||
_ = s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
||||
|
||||
s2 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger())
|
||||
s2 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger().Sugar())
|
||||
s2.fetchDBRecords(ctx)
|
||||
require.Len(t, s2.messageQueue.messages, 1)
|
||||
require.Equal(t, msg, s2.messageQueue.messages[0].msg)
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
|||
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
|
@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
|||
// Simulate a message has been received via relay protocol
|
||||
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
|
@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
|||
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger())
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
|
@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
|||
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4))
|
||||
require.NoError(t, err)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger())
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) {
|
|||
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
|
||||
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||
|
||||
|
@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
|
|||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||
|
@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
|
|||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||
|
@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
|
|||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||
|
@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
|
|||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
|
||||
_ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
|
||||
|
@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
|
|||
topic1 := "1"
|
||||
pubsubTopic1 := "topic1"
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
for i := 0; i < 10; i++ {
|
||||
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
|
||||
msg.Payload = []byte{byte(i)}
|
||||
|
@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
|||
topic1 := "1"
|
||||
pubsubTopic1 := "topic1"
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
for i := 0; i < 10; i++ {
|
||||
msg := &pb.WakuMessage{
|
||||
Payload: []byte{byte(i)},
|
||||
|
@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTemporalHistoryQueries(t *testing.T) {
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger())
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar())
|
||||
|
||||
var messages []*pb.WakuMessage
|
||||
for i := 0; i < 10; i++ {
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
)
|
||||
|
||||
func TestSwapCreditDebit(t *testing.T) {
|
||||
swap := NewWakuSwap(utils.Logger(), []SwapOption{
|
||||
swap := NewWakuSwap(utils.Logger().Sugar(), []SwapOption{
|
||||
WithMode(SoftMode),
|
||||
WithThreshold(0, 0),
|
||||
}...)
|
||||
|
|
|
@ -24,7 +24,7 @@ func makeAdminService(t *testing.T) *AdminService {
|
|||
require.NoError(t, err)
|
||||
err = n.Start()
|
||||
require.NoError(t, err)
|
||||
return &AdminService{n, utils.Logger()}
|
||||
return &AdminService{n, utils.Logger().Sugar()}
|
||||
}
|
||||
|
||||
func TestV1Peers(t *testing.T) {
|
||||
|
@ -33,7 +33,7 @@ func TestV1Peers(t *testing.T) {
|
|||
|
||||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger())
|
||||
relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger().Sugar())
|
||||
require.NoError(t, err)
|
||||
defer relay.Stop()
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ func makeFilterService(t *testing.T) *FilterService {
|
|||
_, err = n.Relay().SubscribeToTopic(context.Background(), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
return NewFilterService(n, utils.Logger())
|
||||
return NewFilterService(n, utils.Logger().Sugar())
|
||||
}
|
||||
|
||||
func TestFilterSubscription(t *testing.T) {
|
||||
|
@ -39,13 +39,13 @@ func TestFilterSubscription(t *testing.T) {
|
|||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger())
|
||||
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger().Sugar())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = node.SubscribeToTopic(context.Background(), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger())
|
||||
_, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger().Sugar())
|
||||
|
||||
d := makeFilterService(t)
|
||||
defer d.node.Stop()
|
||||
|
|
|
@ -16,7 +16,7 @@ func makePrivateService(t *testing.T) *PrivateService {
|
|||
err = n.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
return NewPrivateService(n, utils.Logger())
|
||||
return NewPrivateService(n, utils.Logger().Sugar())
|
||||
}
|
||||
|
||||
func TestGetV1SymmetricKey(t *testing.T) {
|
||||
|
|
|
@ -19,7 +19,7 @@ func makeRelayService(t *testing.T) *RelayService {
|
|||
require.NoError(t, err)
|
||||
err = n.Start()
|
||||
require.NoError(t, err)
|
||||
return NewRelayService(n, utils.Logger())
|
||||
return NewRelayService(n, utils.Logger().Sugar())
|
||||
}
|
||||
|
||||
func TestPostV1Message(t *testing.T) {
|
||||
|
|
|
@ -15,7 +15,7 @@ func makeStoreService(t *testing.T) *StoreService {
|
|||
require.NoError(t, err)
|
||||
err = n.Start()
|
||||
require.NoError(t, err)
|
||||
return &StoreService{n, utils.Logger()}
|
||||
return &StoreService{n, utils.Logger().Sugar()}
|
||||
}
|
||||
|
||||
func TestStoreGetV1Messages(t *testing.T) {
|
||||
|
|
|
@ -14,7 +14,7 @@ func TestWakuRpc(t *testing.T) {
|
|||
n, err := node.New(context.Background(), options)
|
||||
require.NoError(t, err)
|
||||
|
||||
rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger())
|
||||
rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger().Sugar())
|
||||
require.NotNil(t, rpc.server)
|
||||
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ func SetLogLevel(level string) error {
|
|||
}
|
||||
|
||||
// Logger creates a zap.Logger with some reasonable defaults
|
||||
func Logger() *zap.SugaredLogger {
|
||||
func Logger() *zap.Logger {
|
||||
if log == nil {
|
||||
cfg := zap.Config{
|
||||
Encoding: "console",
|
||||
|
@ -45,5 +45,5 @@ func Logger() *zap.SugaredLogger {
|
|||
|
||||
log = logger.Named("gowaku")
|
||||
}
|
||||
return log.Sugar()
|
||||
return log
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/status-im/go-waku/logging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -18,7 +19,7 @@ import (
|
|||
var ErrNoPeersAvailable = errors.New("no suitable peers found")
|
||||
|
||||
// SelectPeer is used to return a random peer that supports a given protocol.
|
||||
func SelectPeer(host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) {
|
||||
func SelectPeer(host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) {
|
||||
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
|
||||
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
||||
// This will require us to check for various factors such as:
|
||||
|
@ -29,7 +30,7 @@ func SelectPeer(host host.Host, protocolId string, log *zap.SugaredLogger) (*pee
|
|||
for _, peer := range host.Peerstore().Peers() {
|
||||
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
|
||||
if err != nil {
|
||||
log.Error("error obtaining the protocols supported by peers", zap.Error(err))
|
||||
log.Error("obtaining protocols supported by peers", zap.Error(err), logging.HostID("peer", peer))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -52,7 +53,7 @@ type pingResult struct {
|
|||
}
|
||||
|
||||
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
|
||||
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) {
|
||||
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) {
|
||||
var peers peer.IDSlice
|
||||
for _, peer := range host.Peerstore().Peers() {
|
||||
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
|
||||
|
|
Loading…
Reference in New Issue