fix: code review

This commit is contained in:
Richard Ramos 2021-11-16 09:46:00 -04:00
parent 315fbadbf8
commit 4a37a4cb8a
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
8 changed files with 62 additions and 44 deletions

View File

@ -23,7 +23,7 @@ import (
wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/discovery"
"github.com/status-im/go-waku/waku/v2/dnsdisc"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
@ -169,7 +169,7 @@ func main() {
if enableDiscovery && dnsDiscoveryUrl != "" {
ui.displayMessage(fmt.Sprintf("attempting DNS discovery with %s", dnsDiscoveryUrl))
multiaddresses, err := discovery.RetrieveNodes(ctx, dnsDiscoveryUrl, discovery.WithNameserver(dnsDiscoveryNameServer))
multiaddresses, err := dnsdisc.RetrieveNodes(ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(dnsDiscoveryNameServer))
if err != nil {
ui.displayMessage("DNS discovery error: " + err.Error())
} else {

View File

@ -22,7 +22,7 @@ import (
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
libp2pdisc "github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p-core/protocol"
@ -33,7 +33,7 @@ import (
"github.com/status-im/go-waku/waku/metrics"
"github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/persistence/sqlite"
"github.com/status-im/go-waku/waku/v2/discovery"
"github.com/status-im/go-waku/waku/v2/dnsdisc"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
@ -159,7 +159,7 @@ func Execute(options Options) {
}
if options.Rendezvous.Enable {
nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(libp2pdisc.Limit(45), libp2pdisc.TTL(time.Duration(20)*time.Second))))
nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second))))
}
wakuNode, err := node.New(ctx, nodeOpts...)
@ -205,7 +205,7 @@ func Execute(options Options) {
if options.DNSDiscovery.URL != "" {
log.Info("attempting DNS discovery with ", options.DNSDiscovery.URL)
multiaddresses, err := discovery.RetrieveNodes(ctx, options.DNSDiscovery.URL, discovery.WithNameserver(options.DNSDiscovery.Nameserver))
multiaddresses, err := dnsdisc.RetrieveNodes(ctx, options.DNSDiscovery.URL, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver))
if err != nil {
log.Warn("dns discovery error ", err)
} else {

View File

@ -117,22 +117,23 @@ func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.Priv
return nil, err
}
discV5 := new(DiscoveryV5)
discV5.host = host
discV5.params = params
discV5.peerCache.rng = rand.New(rand.NewSource(rand.Int63()))
discV5.peerCache.recs = make(map[peer.ID]peerRecord)
discV5.localnode = localnode
discV5.config = discover.Config{
PrivateKey: priv,
Bootnodes: params.bootnodes,
}
discV5.udpAddr = &net.UDPAddr{
IP: ipAddr,
Port: params.udpPort,
}
return discV5, nil
return &DiscoveryV5{
host: host,
params: params,
peerCache: peerCache{
rng: rand.New(rand.NewSource(rand.Int63())),
recs: make(map[peer.ID]peerRecord),
},
localnode: localnode,
config: discover.Config{
PrivateKey: priv,
Bootnodes: params.bootnodes,
},
udpAddr: &net.UDPAddr{
IP: ipAddr,
Port: params.udpPort,
},
}, nil
}
func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort int, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP) (*enode.LocalNode, error) {
@ -145,16 +146,16 @@ func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort in
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
localnode.SetFallbackUDP(udpPort)
localnode.Set(enr.WithEntry(WakuENRField, wakuFlags))
localnode.Set(enr.IP(ipAddr)) // Test if IP changes in p2p/enode/localnode.go ?
localnode.Set(enr.UDP(udpPort))
localnode.Set(enr.TCP(tcpPort))
if advertiseAddr != nil {
localnode.SetStaticIP(*advertiseAddr)
}
localnode.Set(enr.WithEntry(WakuENRField, wakuFlags))
localnode.Set(enr.IP(ipAddr))
localnode.Set(enr.UDP(udpPort))
localnode.Set(enr.TCP(tcpPort))
return localnode, nil
}
@ -277,6 +278,22 @@ func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan st
close(doneCh)
}
func (d *DiscoveryV5) removeExpiredPeers() int {
// Remove all expired entries from cache
currentTime := time.Now().Unix()
newCacheSize := len(d.peerCache.recs)
for p := range d.peerCache.recs {
rec := d.peerCache.recs[p]
if rec.expire < currentTime {
newCacheSize--
delete(d.peerCache.recs, p)
}
}
return newCacheSize
}
func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
// Get options
var options discovery.Options
@ -296,20 +313,10 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco
d.peerCache.Lock()
defer d.peerCache.Unlock()
// Remove all expired entries from cache
currentTime := time.Now().Unix()
newCacheSize := len(d.peerCache.recs)
for p := range d.peerCache.recs {
rec := d.peerCache.recs[p]
if rec.expire < currentTime {
newCacheSize--
delete(d.peerCache.recs, p)
}
}
cacheSize := d.removeExpiredPeers()
// Discover new records if we don't have enough
if newCacheSize < limit {
if cacheSize < limit {
iterator := d.listener.RandomNodes()
iterator = enode.Filter(iterator, d.evaluateNode)
defer iterator.Close()

View File

@ -1,4 +1,4 @@
package discovery
package dnsdisc
import (
"context"

View File

@ -1,4 +1,4 @@
package discovery
package dnsdisc
import (
"context"

View File

@ -1,4 +1,4 @@
package discovery
package dnsdisc
import (
"context"

View File

@ -1,4 +1,4 @@
package discovery
package dnsdisc
import (
"context"

View File

@ -123,6 +123,17 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
log.Info("Listening on ", addr)
}
go func() {
ticker := time.NewTicker(10 * time.Second)
select {
case <-ticker.C:
for _, addr := range w.ListenAddresses() {
log.Info("Listening on ", addr)
}
}
}()
return w, nil
}