mirror of
https://github.com/logos-messaging/go-discover.git
synced 2026-01-02 13:03:12 +00:00
feat: use zap logger
This commit is contained in:
parent
fe708039d5
commit
5bb26bd08e
@ -20,8 +20,9 @@ import (
|
||||
"crypto/ecdsa"
|
||||
"net"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||
@ -48,7 +49,7 @@ type Config struct {
|
||||
NetRestrict *netutil.Netlist // list of allowed IP networks
|
||||
Bootnodes []*enode.Node // list of bootstrap nodes
|
||||
Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
|
||||
Log log.Logger // if set, log messages go here
|
||||
Log *zap.Logger // if set, log messages go here
|
||||
|
||||
// V5ProtocolID configures the discv5 protocol identifier.
|
||||
V5ProtocolID *[6]byte
|
||||
@ -59,7 +60,7 @@ type Config struct {
|
||||
|
||||
func (cfg Config) withDefaults() Config {
|
||||
if cfg.Log == nil {
|
||||
cfg.Log = log.Root()
|
||||
cfg.Log = zap.NewNop()
|
||||
}
|
||||
if cfg.ValidSchemes == nil {
|
||||
cfg.ValidSchemes = enode.ValidSchemes
|
||||
|
||||
@ -151,12 +151,9 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
|
||||
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
|
||||
// Remove the node from the local table if it fails to return anything useful too
|
||||
// many times, but only if there are enough other nodes in the bucket.
|
||||
dropped := false
|
||||
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
|
||||
dropped = true
|
||||
it.tab.delete(n)
|
||||
}
|
||||
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
|
||||
} else if fails > 0 {
|
||||
// Reset failure counter because it counts _consecutive_ failures.
|
||||
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)
|
||||
|
||||
@ -32,8 +32,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||
)
|
||||
@ -71,7 +72,7 @@ type Table struct {
|
||||
rand *mrand.Rand // source of randomness, periodically reseeded
|
||||
ips netutil.DistinctNetSet
|
||||
|
||||
log log.Logger
|
||||
log *zap.Logger
|
||||
db *enode.DB // database of known nodes
|
||||
net transport
|
||||
refreshReq chan chan struct{}
|
||||
@ -99,26 +100,32 @@ type bucket struct {
|
||||
ips netutil.DistinctNetSet
|
||||
}
|
||||
|
||||
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) {
|
||||
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, logger *zap.Logger) (*Table, error) {
|
||||
buckets := [nBuckets]*bucket{}
|
||||
for i := range buckets {
|
||||
buckets[i] = &bucket{
|
||||
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
|
||||
}
|
||||
}
|
||||
nursery := wrapNodes(bootnodes)
|
||||
|
||||
tab := &Table{
|
||||
net: t,
|
||||
mutex: sync.Mutex{},
|
||||
buckets: buckets,
|
||||
nursery: nursery,
|
||||
rand: mrand.New(mrand.NewSource(time.Now().UnixNano())),
|
||||
ips: netutil.DistinctNetSet{},
|
||||
log: logger,
|
||||
db: db,
|
||||
net: t,
|
||||
refreshReq: make(chan chan struct{}),
|
||||
initDone: make(chan struct{}),
|
||||
closeReq: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
rand: mrand.New(mrand.NewSource(0)),
|
||||
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
|
||||
log: log,
|
||||
}
|
||||
if err := tab.setFallbackNodes(bootnodes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range tab.buckets {
|
||||
tab.buckets[i] = &bucket{
|
||||
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
|
||||
}
|
||||
}
|
||||
tab.seedRand()
|
||||
tab.loadSeedNodes()
|
||||
|
||||
@ -309,8 +316,6 @@ func (tab *Table) loadSeedNodes() {
|
||||
tab.mutex.Unlock()
|
||||
for i := range seeds {
|
||||
seed := seeds[i]
|
||||
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
|
||||
tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
|
||||
tab.addSeenNode(seed)
|
||||
}
|
||||
}
|
||||
@ -329,32 +334,41 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
|
||||
// Ping the selected node and wait for a pong.
|
||||
remoteSeq, err := tab.net.ping(unwrapNode(last))
|
||||
|
||||
logger := tab.log.With(zap.String("id", last.ID().String()), zap.String("addr", last.addr().String()))
|
||||
|
||||
// Also fetch record if the node replied and returned a higher sequence number.
|
||||
if last.Seq() < remoteSeq {
|
||||
n, err := tab.net.RequestENR(unwrapNode(last))
|
||||
if err != nil {
|
||||
tab.log.Debug("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err)
|
||||
logger.Debug("ENR request failed", zap.Error(err))
|
||||
} else {
|
||||
last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks}
|
||||
}
|
||||
}
|
||||
|
||||
logger = logger.With(zap.Int("b", bi))
|
||||
|
||||
tab.mutex.Lock()
|
||||
defer tab.mutex.Unlock()
|
||||
b := tab.buckets[bi]
|
||||
if err == nil {
|
||||
// The node responded, move it to the front.
|
||||
last.livenessChecks++
|
||||
tab.log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks)
|
||||
logger.Debug("Revalidated node", zap.Uint("checks", last.livenessChecks))
|
||||
tab.bumpInBucket(b, last)
|
||||
return
|
||||
}
|
||||
|
||||
// No reply received, pick a replacement or delete the node if there aren't
|
||||
// any replacements.
|
||||
if r := tab.replace(b, last); r != nil {
|
||||
tab.log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP())
|
||||
tab.log.Debug("Replaced dead node",
|
||||
zap.Uint("checks", last.livenessChecks),
|
||||
zap.String("r", r.ID().String()),
|
||||
zap.String("rip", r.IP().String()))
|
||||
} else {
|
||||
tab.log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks)
|
||||
tab.log.Debug("Removed dead node",
|
||||
zap.Uint("checks", last.livenessChecks))
|
||||
}
|
||||
}
|
||||
|
||||
@ -552,11 +566,11 @@ func (tab *Table) addIP(b *bucket, ip net.IP) bool {
|
||||
return true
|
||||
}
|
||||
if !tab.ips.Add(ip) {
|
||||
tab.log.Debug("IP exceeds table limit", "ip", ip)
|
||||
tab.log.Debug("IP exceeds table limit", zap.Stringer("ip", ip))
|
||||
return false
|
||||
}
|
||||
if !b.ips.Add(ip) {
|
||||
tab.log.Debug("IP exceeds bucket limit", "ip", ip)
|
||||
tab.log.Debug("IP exceeds bucket limit", zap.Stringer("ip", ip))
|
||||
tab.ips.Remove(ip)
|
||||
return false
|
||||
}
|
||||
|
||||
@ -27,8 +27,9 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
)
|
||||
@ -43,7 +44,11 @@ func init() {
|
||||
|
||||
func newTestTable(t transport) (*Table, *enode.DB) {
|
||||
db, _ := enode.OpenDB("")
|
||||
tab, _ := newTable(t, db, nil, log.Root())
|
||||
logger, err := zap.NewDevelopment()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
tab, _ := newTable(t, db, nil, logger)
|
||||
go tab.loop()
|
||||
return tab, db
|
||||
}
|
||||
|
||||
@ -29,8 +29,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||
"github.com/waku-org/go-discover/discover/v4wire"
|
||||
@ -66,7 +67,7 @@ const (
|
||||
// UDPv4 implements the v4 wire protocol.
|
||||
type UDPv4 struct {
|
||||
conn UDPConn
|
||||
log log.Logger
|
||||
log *zap.Logger
|
||||
netrestrict *netutil.Netlist
|
||||
priv *ecdsa.PrivateKey
|
||||
localNode *enode.LocalNode
|
||||
@ -311,7 +312,6 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke
|
||||
nreceived++
|
||||
n, err := t.nodeFromRPC(toaddr, rn)
|
||||
if err != nil {
|
||||
t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
|
||||
continue
|
||||
}
|
||||
nodes = append(nodes, n)
|
||||
@ -506,7 +506,6 @@ func (t *UDPv4) send(toaddr *net.UDPAddr, toid enode.ID, req v4wire.Packet) ([]b
|
||||
|
||||
func (t *UDPv4) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet []byte) error {
|
||||
_, err := t.conn.WriteToUDP(packet, toaddr)
|
||||
t.log.Trace(">> "+what, "id", toid, "addr", toaddr, "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -522,12 +521,12 @@ func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) {
|
||||
nbytes, from, err := t.conn.ReadFromUDP(buf)
|
||||
if netutil.IsTemporaryError(err) {
|
||||
// Ignore temporary read errors.
|
||||
t.log.Debug("Temporary UDP read error", "err", err)
|
||||
t.log.Debug("Temporary UDP read error", zap.Error(err))
|
||||
continue
|
||||
} else if err != nil {
|
||||
// Shut down the loop for permanent errors.
|
||||
if !errors.Is(err, io.EOF) {
|
||||
t.log.Debug("UDP read error", "err", err)
|
||||
t.log.Debug("UDP read error", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -543,7 +542,10 @@ func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) {
|
||||
func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error {
|
||||
rawpacket, fromKey, hash, err := v4wire.Decode(buf)
|
||||
if err != nil {
|
||||
t.log.Debug("Bad discv4 packet", "addr", from, "err", err)
|
||||
t.log.Debug("bad discv4 packet",
|
||||
zap.Stringer("addr", from),
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
}
|
||||
packet := t.wrapPacket(rawpacket)
|
||||
@ -551,7 +553,6 @@ func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error {
|
||||
if err == nil && packet.preverify != nil {
|
||||
err = packet.preverify(packet, from, fromID, fromKey)
|
||||
}
|
||||
t.log.Trace("<< "+packet.Name(), "id", fromID, "addr", from, "err", err)
|
||||
if err == nil && packet.handle != nil {
|
||||
packet.handle(packet, from, fromID, hash)
|
||||
}
|
||||
|
||||
@ -31,11 +31,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/waku-org/go-discover/discover/v4wire"
|
||||
"github.com/waku-org/go-discover/internal/testlog"
|
||||
)
|
||||
|
||||
// shared test variables
|
||||
@ -71,7 +71,7 @@ func newUDPTest(t *testing.T) *udpTest {
|
||||
ln := enode.NewLocalNode(test.db, test.localkey)
|
||||
test.udp, _ = ListenV4(test.pipe, ln, Config{
|
||||
PrivateKey: test.localkey,
|
||||
Log: testlog.Logger(t, log.LvlTrace),
|
||||
Log: zap.NewNop(),
|
||||
})
|
||||
test.table = test.udp.tab
|
||||
// Wait for initial refresh so the table doesn't send unexpected findnode.
|
||||
@ -555,15 +555,6 @@ func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 {
|
||||
db, _ := enode.OpenDB("")
|
||||
ln := enode.NewLocalNode(db, cfg.PrivateKey)
|
||||
|
||||
// Prefix logs with node ID.
|
||||
lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString())
|
||||
lfmt := log.TerminalFormat(false)
|
||||
cfg.Log = testlog.Logger(t, log.LvlTrace)
|
||||
cfg.Log.SetHandler(log.FuncHandler(func(r *log.Record) error {
|
||||
t.Logf("%s %s", lprefix, lfmt.Format(r))
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Listen.
|
||||
socket, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}})
|
||||
if err != nil {
|
||||
|
||||
@ -29,13 +29,13 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/waku-org/go-discover/discover/v5wire"
|
||||
"github.com/waku-org/go-discover/internal/testlog"
|
||||
)
|
||||
|
||||
// Real sockets, real crypto: this test checks end-to-end connectivity for UDPv5.
|
||||
@ -79,13 +79,11 @@ func startLocalhostV5(t *testing.T, cfg Config) *UDPv5 {
|
||||
ln := enode.NewLocalNode(db, cfg.PrivateKey)
|
||||
|
||||
// Prefix logs with node ID.
|
||||
lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString())
|
||||
lfmt := log.TerminalFormat(false)
|
||||
cfg.Log = testlog.Logger(t, log.LvlTrace)
|
||||
cfg.Log.SetHandler(log.FuncHandler(func(r *log.Record) error {
|
||||
t.Logf("%s %s", lprefix, lfmt.Format(r))
|
||||
return nil
|
||||
}))
|
||||
logger, err := zap.NewDevelopment()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cfg.Log = logger.With(zap.String("nodeId", ln.ID().TerminalString()))
|
||||
|
||||
// Listen.
|
||||
socket, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}})
|
||||
@ -751,6 +749,11 @@ func (c *testCodec) decodeFrame(input []byte) (frame testCodecFrame, p v5wire.Pa
|
||||
}
|
||||
|
||||
func newUDPV5Test(t *testing.T) *udpV5Test {
|
||||
logger, err := zap.NewDevelopment()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
test := &udpV5Test{
|
||||
t: t,
|
||||
pipe: newpipe(),
|
||||
@ -766,7 +769,7 @@ func newUDPV5Test(t *testing.T) *udpV5Test {
|
||||
ln.Set(enr.UDP(30303))
|
||||
test.udp, _ = ListenV5(context.Background(), test.pipe, ln, Config{
|
||||
PrivateKey: test.localkey,
|
||||
Log: testlog.Logger(t, log.LvlTrace),
|
||||
Log: logger,
|
||||
ValidSchemes: enode.ValidSchemesForTesting,
|
||||
})
|
||||
test.udp.codec = &testCodec{test: test, id: ln.ID()}
|
||||
|
||||
@ -1,142 +0,0 @@
|
||||
// Copyright 2019 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package testlog provides a log handler for unit tests.
|
||||
package testlog
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// Handler returns a log handler which logs to the unit test log of t.
|
||||
func Handler(t *testing.T, level log.Lvl) log.Handler {
|
||||
return log.LvlFilterHandler(level, &handler{t, log.TerminalFormat(false)})
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
t *testing.T
|
||||
fmt log.Format
|
||||
}
|
||||
|
||||
func (h *handler) Log(r *log.Record) error {
|
||||
h.t.Logf("%s", h.fmt.Format(r))
|
||||
return nil
|
||||
}
|
||||
|
||||
// logger implements log.Logger such that all output goes to the unit test log via
|
||||
// t.Logf(). All methods in between logger.Trace, logger.Debug, etc. are marked as test
|
||||
// helpers, so the file and line number in unit test output correspond to the call site
|
||||
// which emitted the log message.
|
||||
type logger struct {
|
||||
t *testing.T
|
||||
l log.Logger
|
||||
mu *sync.Mutex
|
||||
h *bufHandler
|
||||
}
|
||||
|
||||
type bufHandler struct {
|
||||
buf []*log.Record
|
||||
fmt log.Format
|
||||
}
|
||||
|
||||
func (h *bufHandler) Log(r *log.Record) error {
|
||||
h.buf = append(h.buf, r)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Logger returns a logger which logs to the unit test log of t.
|
||||
func Logger(t *testing.T, level log.Lvl) log.Logger {
|
||||
l := &logger{
|
||||
t: t,
|
||||
l: log.New(),
|
||||
mu: new(sync.Mutex),
|
||||
h: &bufHandler{fmt: log.TerminalFormat(false)},
|
||||
}
|
||||
l.l.SetHandler(log.LvlFilterHandler(level, l.h))
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *logger) Trace(msg string, ctx ...interface{}) {
|
||||
l.t.Helper()
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.l.Trace(msg, ctx...)
|
||||
l.flush()
|
||||
}
|
||||
|
||||
func (l *logger) Debug(msg string, ctx ...interface{}) {
|
||||
l.t.Helper()
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.l.Debug(msg, ctx...)
|
||||
l.flush()
|
||||
}
|
||||
|
||||
func (l *logger) Info(msg string, ctx ...interface{}) {
|
||||
l.t.Helper()
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.l.Info(msg, ctx...)
|
||||
l.flush()
|
||||
}
|
||||
|
||||
func (l *logger) Warn(msg string, ctx ...interface{}) {
|
||||
l.t.Helper()
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.l.Warn(msg, ctx...)
|
||||
l.flush()
|
||||
}
|
||||
|
||||
func (l *logger) Error(msg string, ctx ...interface{}) {
|
||||
l.t.Helper()
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.l.Error(msg, ctx...)
|
||||
l.flush()
|
||||
}
|
||||
|
||||
func (l *logger) Crit(msg string, ctx ...interface{}) {
|
||||
l.t.Helper()
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.l.Crit(msg, ctx...)
|
||||
l.flush()
|
||||
}
|
||||
|
||||
func (l *logger) New(ctx ...interface{}) log.Logger {
|
||||
return &logger{l.t, l.l.New(ctx...), l.mu, l.h}
|
||||
}
|
||||
|
||||
func (l *logger) GetHandler() log.Handler {
|
||||
return l.l.GetHandler()
|
||||
}
|
||||
|
||||
func (l *logger) SetHandler(h log.Handler) {
|
||||
l.l.SetHandler(h)
|
||||
}
|
||||
|
||||
// flush writes all buffered messages and clears the buffer.
|
||||
func (l *logger) flush() {
|
||||
l.t.Helper()
|
||||
for _, r := range l.h.buf {
|
||||
l.t.Logf("%s", l.h.fmt.Format(r))
|
||||
}
|
||||
l.h.buf = nil
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user