From 4a37a4cb8a664d0d5770ae62aa34df7d244fcd93 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 16 Nov 2021 09:46:00 -0400 Subject: [PATCH] fix: code review --- examples/chat2/main.go | 4 +- waku/node.go | 8 +- waku/v2/discv5/discover.go | 75 ++++++++++--------- waku/v2/{discovery => dnsdisc}/enr.go | 2 +- waku/v2/{discovery => dnsdisc}/enr_test.go | 2 +- waku/v2/{discovery => dnsdisc}/resolver.go | 2 +- .../{discovery => dnsdisc}/resolver_test.go | 2 +- waku/v2/node/wakunode2.go | 11 +++ 8 files changed, 62 insertions(+), 44 deletions(-) rename waku/v2/{discovery => dnsdisc}/enr.go (98%) rename waku/v2/{discovery => dnsdisc}/enr_test.go (98%) rename waku/v2/{discovery => dnsdisc}/resolver.go (96%) rename waku/v2/{discovery => dnsdisc}/resolver_test.go (96%) diff --git a/examples/chat2/main.go b/examples/chat2/main.go index 69e27f9a..91036a1b 100644 --- a/examples/chat2/main.go +++ b/examples/chat2/main.go @@ -23,7 +23,7 @@ import ( wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol" "github.com/multiformats/go-multiaddr" - "github.com/status-im/go-waku/waku/v2/discovery" + "github.com/status-im/go-waku/waku/v2/dnsdisc" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" @@ -169,7 +169,7 @@ func main() { if enableDiscovery && dnsDiscoveryUrl != "" { ui.displayMessage(fmt.Sprintf("attempting DNS discovery with %s", dnsDiscoveryUrl)) - multiaddresses, err := discovery.RetrieveNodes(ctx, dnsDiscoveryUrl, discovery.WithNameserver(dnsDiscoveryNameServer)) + multiaddresses, err := dnsdisc.RetrieveNodes(ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(dnsDiscoveryNameServer)) if err != nil { ui.displayMessage("DNS discovery error: " + err.Error()) } else { diff --git a/waku/node.go b/waku/node.go index 19fe3533..9ddfb2bb 100644 --- a/waku/node.go +++ b/waku/node.go @@ -22,7 +22,7 @@ import ( logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" - libp2pdisc "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p-core/protocol" @@ -33,7 +33,7 @@ import ( "github.com/status-im/go-waku/waku/metrics" "github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/persistence/sqlite" - "github.com/status-im/go-waku/waku/v2/discovery" + "github.com/status-im/go-waku/waku/v2/dnsdisc" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" @@ -159,7 +159,7 @@ func Execute(options Options) { } if options.Rendezvous.Enable { - nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(libp2pdisc.Limit(45), libp2pdisc.TTL(time.Duration(20)*time.Second)))) + nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second)))) } wakuNode, err := node.New(ctx, nodeOpts...) @@ -205,7 +205,7 @@ func Execute(options Options) { if options.DNSDiscovery.URL != "" { log.Info("attempting DNS discovery with ", options.DNSDiscovery.URL) - multiaddresses, err := discovery.RetrieveNodes(ctx, options.DNSDiscovery.URL, discovery.WithNameserver(options.DNSDiscovery.Nameserver)) + multiaddresses, err := dnsdisc.RetrieveNodes(ctx, options.DNSDiscovery.URL, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver)) if err != nil { log.Warn("dns discovery error ", err) } else { diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 67c3ec45..84000316 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -117,22 +117,23 @@ func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.Priv return nil, err } - discV5 := new(DiscoveryV5) - discV5.host = host - discV5.params = params - discV5.peerCache.rng = rand.New(rand.NewSource(rand.Int63())) - discV5.peerCache.recs = make(map[peer.ID]peerRecord) - discV5.localnode = localnode - discV5.config = discover.Config{ - PrivateKey: priv, - Bootnodes: params.bootnodes, - } - discV5.udpAddr = &net.UDPAddr{ - IP: ipAddr, - Port: params.udpPort, - } - - return discV5, nil + return &DiscoveryV5{ + host: host, + params: params, + peerCache: peerCache{ + rng: rand.New(rand.NewSource(rand.Int63())), + recs: make(map[peer.ID]peerRecord), + }, + localnode: localnode, + config: discover.Config{ + PrivateKey: priv, + Bootnodes: params.bootnodes, + }, + udpAddr: &net.UDPAddr{ + IP: ipAddr, + Port: params.udpPort, + }, + }, nil } func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort int, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP) (*enode.LocalNode, error) { @@ -145,16 +146,16 @@ func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort in localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) localnode.SetFallbackUDP(udpPort) + localnode.Set(enr.WithEntry(WakuENRField, wakuFlags)) + + localnode.Set(enr.IP(ipAddr)) // Test if IP changes in p2p/enode/localnode.go ? + localnode.Set(enr.UDP(udpPort)) + localnode.Set(enr.TCP(tcpPort)) + if advertiseAddr != nil { localnode.SetStaticIP(*advertiseAddr) } - localnode.Set(enr.WithEntry(WakuENRField, wakuFlags)) - - localnode.Set(enr.IP(ipAddr)) - localnode.Set(enr.UDP(udpPort)) - localnode.Set(enr.TCP(tcpPort)) - return localnode, nil } @@ -277,6 +278,22 @@ func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan st close(doneCh) } +func (d *DiscoveryV5) removeExpiredPeers() int { + // Remove all expired entries from cache + currentTime := time.Now().Unix() + newCacheSize := len(d.peerCache.recs) + + for p := range d.peerCache.recs { + rec := d.peerCache.recs[p] + if rec.expire < currentTime { + newCacheSize-- + delete(d.peerCache.recs, p) + } + } + + return newCacheSize +} + func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { // Get options var options discovery.Options @@ -296,20 +313,10 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco d.peerCache.Lock() defer d.peerCache.Unlock() - // Remove all expired entries from cache - currentTime := time.Now().Unix() - newCacheSize := len(d.peerCache.recs) - - for p := range d.peerCache.recs { - rec := d.peerCache.recs[p] - if rec.expire < currentTime { - newCacheSize-- - delete(d.peerCache.recs, p) - } - } + cacheSize := d.removeExpiredPeers() // Discover new records if we don't have enough - if newCacheSize < limit { + if cacheSize < limit { iterator := d.listener.RandomNodes() iterator = enode.Filter(iterator, d.evaluateNode) defer iterator.Close() diff --git a/waku/v2/discovery/enr.go b/waku/v2/dnsdisc/enr.go similarity index 98% rename from waku/v2/discovery/enr.go rename to waku/v2/dnsdisc/enr.go index bb538fd0..0d9212a9 100644 --- a/waku/v2/discovery/enr.go +++ b/waku/v2/dnsdisc/enr.go @@ -1,4 +1,4 @@ -package discovery +package dnsdisc import ( "context" diff --git a/waku/v2/discovery/enr_test.go b/waku/v2/dnsdisc/enr_test.go similarity index 98% rename from waku/v2/discovery/enr_test.go rename to waku/v2/dnsdisc/enr_test.go index 843cca88..28c69274 100644 --- a/waku/v2/discovery/enr_test.go +++ b/waku/v2/dnsdisc/enr_test.go @@ -1,4 +1,4 @@ -package discovery +package dnsdisc import ( "context" diff --git a/waku/v2/discovery/resolver.go b/waku/v2/dnsdisc/resolver.go similarity index 96% rename from waku/v2/discovery/resolver.go rename to waku/v2/dnsdisc/resolver.go index 9708f42e..8985b33b 100644 --- a/waku/v2/discovery/resolver.go +++ b/waku/v2/dnsdisc/resolver.go @@ -1,4 +1,4 @@ -package discovery +package dnsdisc import ( "context" diff --git a/waku/v2/discovery/resolver_test.go b/waku/v2/dnsdisc/resolver_test.go similarity index 96% rename from waku/v2/discovery/resolver_test.go rename to waku/v2/dnsdisc/resolver_test.go index b16a44ba..3d34b57f 100644 --- a/waku/v2/discovery/resolver_test.go +++ b/waku/v2/dnsdisc/resolver_test.go @@ -1,4 +1,4 @@ -package discovery +package dnsdisc import ( "context" diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 32b3de51..c170296c 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -123,6 +123,17 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { log.Info("Listening on ", addr) } + go func() { + ticker := time.NewTicker(10 * time.Second) + select { + case <-ticker.C: + for _, addr := range w.ListenAddresses() { + log.Info("Listening on ", addr) + } + } + + }() + return w, nil }