From 6e7c3b6183709cfea655bfaeecf41a482c3766df Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 11 Jan 2023 22:20:23 -0400 Subject: [PATCH] feat: update localnode ENR without having to restart discv5 --- cmd/waku/flags.go | 2 +- mobile/api.go | 4 +- tests/utils.go | 30 ++++++++ waku/options.go | 2 +- waku/v2/discv5/discover.go | 8 ++- waku/v2/discv5/discover_test.go | 12 ++-- waku/v2/node/localnode.go | 69 ++++++++++--------- waku/v2/node/wakunode2.go | 47 ++++++++----- waku/v2/node/wakuoptions.go | 4 +- .../peer_exchange/waku_peer_exchange_test.go | 4 +- 10 files changed, 116 insertions(+), 66 deletions(-) diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index ea21d991..f40c67a9 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -342,7 +342,7 @@ var ( Destination: &options.DiscV5.Nodes, EnvVars: []string{"WAKUNODE2_DISCV5_BOOTSTRAP_NODE"}, }) - Discv5UDPPort = altsrc.NewIntFlag(&cli.IntFlag{ + Discv5UDPPort = altsrc.NewUintFlag(&cli.UintFlag{ Name: "discv5-udp-port", Value: 9000, Usage: "Listening UDP port for Node Discovery v5.", diff --git a/mobile/api.go b/mobile/api.go index 8fc3769b..e92263b0 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -56,7 +56,7 @@ type wakuConfig struct { MinPeersToPublish *int `json:"minPeersToPublish"` EnableDiscV5 *bool `json:"discV5"` DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes"` - DiscV5UDPPort *int `json:"discV5UDPPort"` + DiscV5UDPPort *uint `json:"discV5UDPPort"` } var defaultHost = "0.0.0.0" @@ -66,7 +66,7 @@ var defaultEnableRelay = true var defaultMinPeersToPublish = 0 var defaultEnableFilter = false var defaultEnableDiscV5 = false -var defaultDiscV5UDPPort = 9000 +var defaultDiscV5UDPPort = uint(9000) var defaultLogLevel = "INFO" func getConfig(configJSON string) (wakuConfig, error) { diff --git a/tests/utils.go b/tests/utils.go index 06118470..5f0b2b76 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -52,6 +52,36 @@ func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) { return 0, fmt.Errorf("no free port found") } +// FindFreePort returns an available port number +func FindFreeUDPPort(t *testing.T, host string, maxAttempts int) (int, error) { + t.Helper() + + if host == "" { + host = "localhost" + } + + for i := 0; i < maxAttempts; i++ { + addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, "0")) + if err != nil { + t.Logf("unable to resolve tcp addr: %v", err) + continue + } + l, err := net.ListenUDP("udp", addr) + if err != nil { + l.Close() + t.Logf("unable to listen on addr %q: %v", addr, err) + continue + } + + port := l.LocalAddr().(*net.UDPAddr).Port + l.Close() + return port, nil + + } + + return 0, fmt.Errorf("no free port found") +} + // MakeHost creates a Libp2p host with a random key on a specific port func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) { // Creates a new RSA key pair for this host. diff --git a/waku/options.go b/waku/options.go index 26596d2a..142d4596 100644 --- a/waku/options.go +++ b/waku/options.go @@ -14,7 +14,7 @@ import ( type DiscV5Options struct { Enable bool Nodes cli.StringSlice - Port int + Port uint AutoUpdate bool } diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 8df3498b..e0f0d238 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -58,7 +58,7 @@ type PeerRecord struct { type discV5Parameters struct { autoUpdate bool bootnodes []*enode.Node - udpPort int + udpPort uint advertiseAddr *net.IP } @@ -84,7 +84,7 @@ func WithAdvertiseAddr(addr net.IP) DiscoveryV5Option { } } -func WithUDPPort(port int) DiscoveryV5Option { +func WithUDPPort(port uint) DiscoveryV5Option { return func(params *discV5Parameters) { params.udpPort = port } @@ -132,7 +132,7 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc }, udpAddr: &net.UDPAddr{ IP: net.IPv4zero, - Port: params.udpPort, + Port: int(params.udpPort), }, log: logger, }, nil @@ -250,6 +250,8 @@ func hasTCPPort(node *enode.Node) bool { } func evaluateNode(node *enode.Node) bool { + // TODO: track https://github.com/status-im/nim-waku/issues/770 for improvements over validation func + if node == nil || node.IP() == nil { return false } diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index 504e86c5..8b91324e 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -101,32 +101,32 @@ func TestDiscV5(t *testing.T) { // H1 host1, _, prvKey1 := createHost(t) - udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) + udpPort1, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) ip1, _ := extractIP(host1.Addrs()[0]) l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - d1, err := NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), WithUDPPort(udpPort1)) + d1, err := NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), WithUDPPort(uint(udpPort1))) require.NoError(t, err) // H2 host2, _, prvKey2 := createHost(t) ip2, _ := extractIP(host2.Addrs()[0]) - udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) + udpPort2, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - d2, err := NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + d2, err := NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) // H3 host3, _, prvKey3 := createHost(t) ip3, _ := extractIP(host3.Addrs()[0]) - udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3) + udpPort3, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) l3, err := newLocalnode(prvKey3, ip3, udpPort3, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - d3, err := NewDiscoveryV5(host3, prvKey3, l3, utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + d3, err := NewDiscoveryV5(host3, prvKey3, l3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) defer d1.Stop() diff --git a/waku/v2/node/localnode.go b/waku/v2/node/localnode.go index 4b4f632d..0ff53a3f 100644 --- a/waku/v2/node/localnode.go +++ b/waku/v2/node/localnode.go @@ -26,24 +26,47 @@ func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error return enode.NewLocalNode(db, priv), nil } -func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) error { - localnode.SetFallbackUDP(udpPort) +func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, shouldAutoUpdate bool, log *zap.Logger) error { + localnode.SetFallbackUDP(int(udpPort)) localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags)) localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) - localnode.SetStaticIP(ipAddr.IP) - if udpPort > 0 && udpPort <= math.MaxUint16 { - localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] - } - - if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { - localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) + if udpPort > math.MaxUint16 { + return errors.New("invalid udp port number") } if advertiseAddr != nil { + // An advertised address disables libp2p address updates + // and discv5 predictions localnode.SetStaticIP(*advertiseAddr) + localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6? + } else if !shouldAutoUpdate { + // We received a libp2p address update. Autoupdate is disabled + // Using a static ip will disable endpoint prediction. + localnode.SetStaticIP(ipAddr.IP) + localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6? + } else { + // We received a libp2p address update, but we should still + // allow discv5 to update the enr record. We set the localnode + // keys manually. It's possible that the ENR record might get + // updated automatically + ip4 := ipAddr.IP.To4() + ip6 := ipAddr.IP.To16() + if ip4 != nil && !ip4.IsUnspecified() { + localnode.Set(enr.IPv4(ip4)) + localnode.Set(enr.TCP(uint16(ipAddr.Port))) + } else { + localnode.Delete(enr.IPv4{}) + localnode.Delete(enr.TCP(0)) + } + + if ip6 != nil && !ip6.IsUnspecified() { + localnode.Set(enr.IPv6(ip6)) + localnode.Set(enr.TCP6(ipAddr.Port)) + } else { + localnode.Delete(enr.IPv6{}) + localnode.Delete(enr.TCP6(0)) + } } // Adding websocket multiaddresses @@ -222,26 +245,10 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error { return err } - // TODO: make this optional depending on DNS Disc being enabled - if w.opts.privKey != nil && w.localNode != nil { - err := w.updateLocalNode(w.localNode, w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log) - if err != nil { - w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err)) - return err - } else { - w.log.Info("enr record", logging.ENode("enr", w.localNode.Node())) - // Restarting DiscV5 - discV5 := w.DiscV5() - if discV5 != nil && discV5.IsStarted() { - w.log.Info("restarting discv5") - w.discoveryV5.Stop() - err = w.discoveryV5.Start(ctx) - if err != nil { - w.log.Error("could not restart discv5", zap.Error(err)) - return err - } - } - } + err = w.updateLocalNode(w.localNode, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.opts.discV5autoUpdate, w.log) + if err != nil { + w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err)) + return err } return nil diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index b22ed781..56ff0663 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -83,8 +83,6 @@ type WakuNode struct { localNode *enode.LocalNode - addrChan chan ma.Multiaddr - bcaster v2.Broadcaster connectionNotif ConnectionNotifier @@ -166,7 +164,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.opts = params w.log = params.logger.Named("node2") w.wg = &sync.WaitGroup{} - w.addrChan = make(chan ma.Multiaddr, 1024) w.keepAliveFails = make(map[peer.ID]int) w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay) @@ -231,14 +228,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { return w, nil } -func (w *WakuNode) onAddrChange() { - for m := range w.addrChan { - _ = m - // TODO: determine if still needed. Otherwise remove - } -} - -func (w *WakuNode) checkForAddressChanges(ctx context.Context) { +func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) { defer w.wg.Done() addrs := w.ListenAddresses() @@ -247,7 +237,6 @@ func (w *WakuNode) checkForAddressChanges(ctx context.Context) { for { select { case <-ctx.Done(): - close(w.addrChan) return case <-first: w.log.Info("listening", logging.MultiAddrs("multiaddr", addrs...)) @@ -267,9 +256,6 @@ func (w *WakuNode) checkForAddressChanges(ctx context.Context) { if diff { addrs = newAddrs w.log.Info("listening addresses update received", logging.MultiAddrs("multiaddr", addrs...)) - for _, addr := range addrs { - w.addrChan <- addr - } _ = w.setupENR(ctx, addrs) } } @@ -284,10 +270,10 @@ func (w *WakuNode) Start(ctx context.Context) error { w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log) w.host.Network().Notify(w.connectionNotif) - w.wg.Add(2) + w.wg.Add(3) go w.connectednessListener(ctx) - go w.checkForAddressChanges(ctx) - go w.onAddrChange() + go w.watchMultiaddressChanges(ctx) + go w.watchENRChanges(ctx) if w.opts.keepAliveInterval > time.Duration(0) { w.wg.Add(1) @@ -408,6 +394,31 @@ func (w *WakuNode) ID() string { return w.host.ID().Pretty() } +func (w *WakuNode) watchENRChanges(ctx context.Context) { + defer w.wg.Done() + + timer := time.NewTicker(1 * time.Second) + var prevNodeVal string + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + if w.localNode != nil { + currNodeVal := w.localNode.Node().String() + if prevNodeVal != currNodeVal { + if prevNodeVal == "" { + w.log.Info("enr record", logging.ENode("enr", w.localNode.Node())) + } else { + w.log.Info("new enr record", logging.ENode("enr", w.localNode.Node())) + } + prevNodeVal = currNodeVal + } + } + } + } +} + // ListenAddresses returns all the multiaddresses used by the host func (w *WakuNode) ListenAddresses() []ma.Multiaddr { hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty())) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 8c331f6e..7d9f9350 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -75,7 +75,7 @@ type WakuNodeParameters struct { swapPaymentThreshold int enableDiscV5 bool - udpPort int + udpPort uint discV5bootnodes []*enode.Node discV5Opts []pubsub.DiscoverOpt discV5autoUpdate bool @@ -282,7 +282,7 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option) } // WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery -func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { +func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableDiscV5 = true params.udpPort = udpPort diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 363fb528..93b933fe 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -105,7 +105,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { ip1, _ := extractIP(host1.Addrs()[0]) l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), discv5.WithUDPPort(udpPort1)) + d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) require.NoError(t, err) // H2 @@ -115,7 +115,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { require.NoError(t, err) l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), discv5.WithUDPPort(udpPort2), discv5.WithBootnodes([]*enode.Node{d1.Node()})) + d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()})) require.NoError(t, err) // H3