diff --git a/discover/common.go b/discover/common.go index c36e8dc..d26d52c 100644 --- a/discover/common.go +++ b/discover/common.go @@ -20,8 +20,9 @@ import ( "crypto/ecdsa" "net" + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/netutil" @@ -48,7 +49,7 @@ type Config struct { NetRestrict *netutil.Netlist // list of allowed IP networks Bootnodes []*enode.Node // list of bootstrap nodes Unhandled chan<- ReadPacket // unhandled packets are sent on this channel - Log log.Logger // if set, log messages go here + Log *zap.Logger // if set, log messages go here // V5ProtocolID configures the discv5 protocol identifier. V5ProtocolID *[6]byte @@ -59,7 +60,7 @@ type Config struct { func (cfg Config) withDefaults() Config { if cfg.Log == nil { - cfg.Log = log.Root() + cfg.Log = zap.NewNop() } if cfg.ValidSchemes == nil { cfg.ValidSchemes = enode.ValidSchemes diff --git a/discover/lookup.go b/discover/lookup.go index b8d97b4..91ca7a2 100644 --- a/discover/lookup.go +++ b/discover/lookup.go @@ -151,12 +151,9 @@ func (it *lookup) query(n *node, reply chan<- []*node) { it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails) // Remove the node from the local table if it fails to return anything useful too // many times, but only if there are enough other nodes in the bucket. - dropped := false if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 { - dropped = true it.tab.delete(n) } - it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err) } else if fails > 0 { // Reset failure counter because it counts _consecutive_ failures. it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0) diff --git a/discover/table.go b/discover/table.go index b7ef6b6..56088eb 100644 --- a/discover/table.go +++ b/discover/table.go @@ -32,8 +32,9 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/netutil" ) @@ -71,7 +72,7 @@ type Table struct { rand *mrand.Rand // source of randomness, periodically reseeded ips netutil.DistinctNetSet - log log.Logger + log *zap.Logger db *enode.DB // database of known nodes net transport refreshReq chan chan struct{} @@ -99,26 +100,32 @@ type bucket struct { ips netutil.DistinctNetSet } -func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) { +func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, logger *zap.Logger) (*Table, error) { + buckets := [nBuckets]*bucket{} + for i := range buckets { + buckets[i] = &bucket{ + ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, + } + } + nursery := wrapNodes(bootnodes) + tab := &Table{ - net: t, + mutex: sync.Mutex{}, + buckets: buckets, + nursery: nursery, + rand: mrand.New(mrand.NewSource(time.Now().UnixNano())), + ips: netutil.DistinctNetSet{}, + log: logger, db: db, + net: t, refreshReq: make(chan chan struct{}), initDone: make(chan struct{}), closeReq: make(chan struct{}), closed: make(chan struct{}), - rand: mrand.New(mrand.NewSource(0)), - ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, - log: log, } if err := tab.setFallbackNodes(bootnodes); err != nil { return nil, err } - for i := range tab.buckets { - tab.buckets[i] = &bucket{ - ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, - } - } tab.seedRand() tab.loadSeedNodes() @@ -309,8 +316,6 @@ func (tab *Table) loadSeedNodes() { tab.mutex.Unlock() for i := range seeds { seed := seeds[i] - age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }} - tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) tab.addSeenNode(seed) } } @@ -329,32 +334,41 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { // Ping the selected node and wait for a pong. remoteSeq, err := tab.net.ping(unwrapNode(last)) + logger := tab.log.With(zap.String("id", last.ID().String()), zap.String("addr", last.addr().String())) + // Also fetch record if the node replied and returned a higher sequence number. if last.Seq() < remoteSeq { n, err := tab.net.RequestENR(unwrapNode(last)) if err != nil { - tab.log.Debug("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) + logger.Debug("ENR request failed", zap.Error(err)) } else { last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks} } } + logger = logger.With(zap.Int("b", bi)) + tab.mutex.Lock() defer tab.mutex.Unlock() b := tab.buckets[bi] if err == nil { // The node responded, move it to the front. last.livenessChecks++ - tab.log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks) + logger.Debug("Revalidated node", zap.Uint("checks", last.livenessChecks)) tab.bumpInBucket(b, last) return } + // No reply received, pick a replacement or delete the node if there aren't // any replacements. if r := tab.replace(b, last); r != nil { - tab.log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP()) + tab.log.Debug("Replaced dead node", + zap.Uint("checks", last.livenessChecks), + zap.String("r", r.ID().String()), + zap.String("rip", r.IP().String())) } else { - tab.log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks) + tab.log.Debug("Removed dead node", + zap.Uint("checks", last.livenessChecks)) } } @@ -552,11 +566,11 @@ func (tab *Table) addIP(b *bucket, ip net.IP) bool { return true } if !tab.ips.Add(ip) { - tab.log.Debug("IP exceeds table limit", "ip", ip) + tab.log.Debug("IP exceeds table limit", zap.Stringer("ip", ip)) return false } if !b.ips.Add(ip) { - tab.log.Debug("IP exceeds bucket limit", "ip", ip) + tab.log.Debug("IP exceeds bucket limit", zap.Stringer("ip", ip)) tab.ips.Remove(ip) return false } diff --git a/discover/table_util_test.go b/discover/table_util_test.go index 527afbb..f449212 100644 --- a/discover/table_util_test.go +++ b/discover/table_util_test.go @@ -27,8 +27,9 @@ import ( "sort" "sync" + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" ) @@ -43,7 +44,11 @@ func init() { func newTestTable(t transport) (*Table, *enode.DB) { db, _ := enode.OpenDB("") - tab, _ := newTable(t, db, nil, log.Root()) + logger, err := zap.NewDevelopment() + if err != nil { + panic(err) + } + tab, _ := newTable(t, db, nil, logger) go tab.loop() return tab, db } diff --git a/discover/v4_udp.go b/discover/v4_udp.go index de8ecb3..d40b59b 100644 --- a/discover/v4_udp.go +++ b/discover/v4_udp.go @@ -29,8 +29,9 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/waku-org/go-discover/discover/v4wire" @@ -66,7 +67,7 @@ const ( // UDPv4 implements the v4 wire protocol. type UDPv4 struct { conn UDPConn - log log.Logger + log *zap.Logger netrestrict *netutil.Netlist priv *ecdsa.PrivateKey localNode *enode.LocalNode @@ -311,7 +312,6 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke nreceived++ n, err := t.nodeFromRPC(toaddr, rn) if err != nil { - t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err) continue } nodes = append(nodes, n) @@ -506,7 +506,6 @@ func (t *UDPv4) send(toaddr *net.UDPAddr, toid enode.ID, req v4wire.Packet) ([]b func (t *UDPv4) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet []byte) error { _, err := t.conn.WriteToUDP(packet, toaddr) - t.log.Trace(">> "+what, "id", toid, "addr", toaddr, "err", err) return err } @@ -522,12 +521,12 @@ func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) { nbytes, from, err := t.conn.ReadFromUDP(buf) if netutil.IsTemporaryError(err) { // Ignore temporary read errors. - t.log.Debug("Temporary UDP read error", "err", err) + t.log.Debug("Temporary UDP read error", zap.Error(err)) continue } else if err != nil { // Shut down the loop for permanent errors. if !errors.Is(err, io.EOF) { - t.log.Debug("UDP read error", "err", err) + t.log.Debug("UDP read error", zap.Error(err)) } return } @@ -543,7 +542,10 @@ func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) { func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error { rawpacket, fromKey, hash, err := v4wire.Decode(buf) if err != nil { - t.log.Debug("Bad discv4 packet", "addr", from, "err", err) + t.log.Debug("bad discv4 packet", + zap.Stringer("addr", from), + zap.Error(err), + ) return err } packet := t.wrapPacket(rawpacket) @@ -551,7 +553,6 @@ func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error { if err == nil && packet.preverify != nil { err = packet.preverify(packet, from, fromID, fromKey) } - t.log.Trace("<< "+packet.Name(), "id", fromID, "addr", from, "err", err) if err == nil && packet.handle != nil { packet.handle(packet, from, fromID, hash) } diff --git a/discover/v4_udp_test.go b/discover/v4_udp_test.go index aeacdc2..f0973f2 100644 --- a/discover/v4_udp_test.go +++ b/discover/v4_udp_test.go @@ -31,11 +31,11 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/log" + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/waku-org/go-discover/discover/v4wire" - "github.com/waku-org/go-discover/internal/testlog" ) // shared test variables @@ -71,7 +71,7 @@ func newUDPTest(t *testing.T) *udpTest { ln := enode.NewLocalNode(test.db, test.localkey) test.udp, _ = ListenV4(test.pipe, ln, Config{ PrivateKey: test.localkey, - Log: testlog.Logger(t, log.LvlTrace), + Log: zap.NewNop(), }) test.table = test.udp.tab // Wait for initial refresh so the table doesn't send unexpected findnode. @@ -555,15 +555,6 @@ func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 { db, _ := enode.OpenDB("") ln := enode.NewLocalNode(db, cfg.PrivateKey) - // Prefix logs with node ID. - lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString()) - lfmt := log.TerminalFormat(false) - cfg.Log = testlog.Logger(t, log.LvlTrace) - cfg.Log.SetHandler(log.FuncHandler(func(r *log.Record) error { - t.Logf("%s %s", lprefix, lfmt.Format(r)) - return nil - })) - // Listen. socket, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}}) if err != nil { diff --git a/discover/v5_udp_test.go b/discover/v5_udp_test.go index 52862ac..780efe4 100644 --- a/discover/v5_udp_test.go +++ b/discover/v5_udp_test.go @@ -29,13 +29,13 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/rlp" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "github.com/waku-org/go-discover/discover/v5wire" - "github.com/waku-org/go-discover/internal/testlog" ) // Real sockets, real crypto: this test checks end-to-end connectivity for UDPv5. @@ -79,13 +79,11 @@ func startLocalhostV5(t *testing.T, cfg Config) *UDPv5 { ln := enode.NewLocalNode(db, cfg.PrivateKey) // Prefix logs with node ID. - lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString()) - lfmt := log.TerminalFormat(false) - cfg.Log = testlog.Logger(t, log.LvlTrace) - cfg.Log.SetHandler(log.FuncHandler(func(r *log.Record) error { - t.Logf("%s %s", lprefix, lfmt.Format(r)) - return nil - })) + logger, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + cfg.Log = logger.With(zap.String("nodeId", ln.ID().TerminalString())) // Listen. socket, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}}) @@ -751,6 +749,11 @@ func (c *testCodec) decodeFrame(input []byte) (frame testCodecFrame, p v5wire.Pa } func newUDPV5Test(t *testing.T) *udpV5Test { + logger, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + test := &udpV5Test{ t: t, pipe: newpipe(), @@ -766,7 +769,7 @@ func newUDPV5Test(t *testing.T) *udpV5Test { ln.Set(enr.UDP(30303)) test.udp, _ = ListenV5(context.Background(), test.pipe, ln, Config{ PrivateKey: test.localkey, - Log: testlog.Logger(t, log.LvlTrace), + Log: logger, ValidSchemes: enode.ValidSchemesForTesting, }) test.udp.codec = &testCodec{test: test, id: ln.ID()} diff --git a/internal/testlog/testlog.go b/internal/testlog/testlog.go deleted file mode 100644 index 684339f..0000000 --- a/internal/testlog/testlog.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -// Package testlog provides a log handler for unit tests. -package testlog - -import ( - "sync" - "testing" - - "github.com/ethereum/go-ethereum/log" -) - -// Handler returns a log handler which logs to the unit test log of t. -func Handler(t *testing.T, level log.Lvl) log.Handler { - return log.LvlFilterHandler(level, &handler{t, log.TerminalFormat(false)}) -} - -type handler struct { - t *testing.T - fmt log.Format -} - -func (h *handler) Log(r *log.Record) error { - h.t.Logf("%s", h.fmt.Format(r)) - return nil -} - -// logger implements log.Logger such that all output goes to the unit test log via -// t.Logf(). All methods in between logger.Trace, logger.Debug, etc. are marked as test -// helpers, so the file and line number in unit test output correspond to the call site -// which emitted the log message. -type logger struct { - t *testing.T - l log.Logger - mu *sync.Mutex - h *bufHandler -} - -type bufHandler struct { - buf []*log.Record - fmt log.Format -} - -func (h *bufHandler) Log(r *log.Record) error { - h.buf = append(h.buf, r) - return nil -} - -// Logger returns a logger which logs to the unit test log of t. -func Logger(t *testing.T, level log.Lvl) log.Logger { - l := &logger{ - t: t, - l: log.New(), - mu: new(sync.Mutex), - h: &bufHandler{fmt: log.TerminalFormat(false)}, - } - l.l.SetHandler(log.LvlFilterHandler(level, l.h)) - return l -} - -func (l *logger) Trace(msg string, ctx ...interface{}) { - l.t.Helper() - l.mu.Lock() - defer l.mu.Unlock() - l.l.Trace(msg, ctx...) - l.flush() -} - -func (l *logger) Debug(msg string, ctx ...interface{}) { - l.t.Helper() - l.mu.Lock() - defer l.mu.Unlock() - l.l.Debug(msg, ctx...) - l.flush() -} - -func (l *logger) Info(msg string, ctx ...interface{}) { - l.t.Helper() - l.mu.Lock() - defer l.mu.Unlock() - l.l.Info(msg, ctx...) - l.flush() -} - -func (l *logger) Warn(msg string, ctx ...interface{}) { - l.t.Helper() - l.mu.Lock() - defer l.mu.Unlock() - l.l.Warn(msg, ctx...) - l.flush() -} - -func (l *logger) Error(msg string, ctx ...interface{}) { - l.t.Helper() - l.mu.Lock() - defer l.mu.Unlock() - l.l.Error(msg, ctx...) - l.flush() -} - -func (l *logger) Crit(msg string, ctx ...interface{}) { - l.t.Helper() - l.mu.Lock() - defer l.mu.Unlock() - l.l.Crit(msg, ctx...) - l.flush() -} - -func (l *logger) New(ctx ...interface{}) log.Logger { - return &logger{l.t, l.l.New(ctx...), l.mu, l.h} -} - -func (l *logger) GetHandler() log.Handler { - return l.l.GetHandler() -} - -func (l *logger) SetHandler(h log.Handler) { - l.l.SetHandler(h) -} - -// flush writes all buffered messages and clears the buffer. -func (l *logger) flush() { - l.t.Helper() - for _, r := range l.h.buf { - l.t.Logf("%s", l.h.fmt.Format(r)) - } - l.h.buf = nil -}