feat: update localnode ENR without having to restart discv5

This commit is contained in:
Richard Ramos 2023-01-11 22:20:23 -04:00 committed by RichΛrd
parent 34c5e97d5e
commit 6e7c3b6183
10 changed files with 116 additions and 66 deletions

View File

@ -342,7 +342,7 @@ var (
Destination: &options.DiscV5.Nodes, Destination: &options.DiscV5.Nodes,
EnvVars: []string{"WAKUNODE2_DISCV5_BOOTSTRAP_NODE"}, EnvVars: []string{"WAKUNODE2_DISCV5_BOOTSTRAP_NODE"},
}) })
Discv5UDPPort = altsrc.NewIntFlag(&cli.IntFlag{ Discv5UDPPort = altsrc.NewUintFlag(&cli.UintFlag{
Name: "discv5-udp-port", Name: "discv5-udp-port",
Value: 9000, Value: 9000,
Usage: "Listening UDP port for Node Discovery v5.", Usage: "Listening UDP port for Node Discovery v5.",

View File

@ -56,7 +56,7 @@ type wakuConfig struct {
MinPeersToPublish *int `json:"minPeersToPublish"` MinPeersToPublish *int `json:"minPeersToPublish"`
EnableDiscV5 *bool `json:"discV5"` EnableDiscV5 *bool `json:"discV5"`
DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes"` DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes"`
DiscV5UDPPort *int `json:"discV5UDPPort"` DiscV5UDPPort *uint `json:"discV5UDPPort"`
} }
var defaultHost = "0.0.0.0" var defaultHost = "0.0.0.0"
@ -66,7 +66,7 @@ var defaultEnableRelay = true
var defaultMinPeersToPublish = 0 var defaultMinPeersToPublish = 0
var defaultEnableFilter = false var defaultEnableFilter = false
var defaultEnableDiscV5 = false var defaultEnableDiscV5 = false
var defaultDiscV5UDPPort = 9000 var defaultDiscV5UDPPort = uint(9000)
var defaultLogLevel = "INFO" var defaultLogLevel = "INFO"
func getConfig(configJSON string) (wakuConfig, error) { func getConfig(configJSON string) (wakuConfig, error) {

View File

@ -52,6 +52,36 @@ func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) {
return 0, fmt.Errorf("no free port found") return 0, fmt.Errorf("no free port found")
} }
// FindFreePort returns an available port number
func FindFreeUDPPort(t *testing.T, host string, maxAttempts int) (int, error) {
t.Helper()
if host == "" {
host = "localhost"
}
for i := 0; i < maxAttempts; i++ {
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, "0"))
if err != nil {
t.Logf("unable to resolve tcp addr: %v", err)
continue
}
l, err := net.ListenUDP("udp", addr)
if err != nil {
l.Close()
t.Logf("unable to listen on addr %q: %v", addr, err)
continue
}
port := l.LocalAddr().(*net.UDPAddr).Port
l.Close()
return port, nil
}
return 0, fmt.Errorf("no free port found")
}
// MakeHost creates a Libp2p host with a random key on a specific port // MakeHost creates a Libp2p host with a random key on a specific port
func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) { func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) {
// Creates a new RSA key pair for this host. // Creates a new RSA key pair for this host.

View File

@ -14,7 +14,7 @@ import (
type DiscV5Options struct { type DiscV5Options struct {
Enable bool Enable bool
Nodes cli.StringSlice Nodes cli.StringSlice
Port int Port uint
AutoUpdate bool AutoUpdate bool
} }

View File

@ -58,7 +58,7 @@ type PeerRecord struct {
type discV5Parameters struct { type discV5Parameters struct {
autoUpdate bool autoUpdate bool
bootnodes []*enode.Node bootnodes []*enode.Node
udpPort int udpPort uint
advertiseAddr *net.IP advertiseAddr *net.IP
} }
@ -84,7 +84,7 @@ func WithAdvertiseAddr(addr net.IP) DiscoveryV5Option {
} }
} }
func WithUDPPort(port int) DiscoveryV5Option { func WithUDPPort(port uint) DiscoveryV5Option {
return func(params *discV5Parameters) { return func(params *discV5Parameters) {
params.udpPort = port params.udpPort = port
} }
@ -132,7 +132,7 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc
}, },
udpAddr: &net.UDPAddr{ udpAddr: &net.UDPAddr{
IP: net.IPv4zero, IP: net.IPv4zero,
Port: params.udpPort, Port: int(params.udpPort),
}, },
log: logger, log: logger,
}, nil }, nil
@ -250,6 +250,8 @@ func hasTCPPort(node *enode.Node) bool {
} }
func evaluateNode(node *enode.Node) bool { func evaluateNode(node *enode.Node) bool {
// TODO: track https://github.com/status-im/nim-waku/issues/770 for improvements over validation func
if node == nil || node.IP() == nil { if node == nil || node.IP() == nil {
return false return false
} }

View File

@ -101,32 +101,32 @@ func TestDiscV5(t *testing.T) {
// H1 // H1
host1, _, prvKey1 := createHost(t) host1, _, prvKey1 := createHost(t)
udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) udpPort1, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3)
require.NoError(t, err) require.NoError(t, err)
ip1, _ := extractIP(host1.Addrs()[0]) ip1, _ := extractIP(host1.Addrs()[0])
l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
d1, err := NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), WithUDPPort(udpPort1)) d1, err := NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), WithUDPPort(uint(udpPort1)))
require.NoError(t, err) require.NoError(t, err)
// H2 // H2
host2, _, prvKey2 := createHost(t) host2, _, prvKey2 := createHost(t)
ip2, _ := extractIP(host2.Addrs()[0]) ip2, _ := extractIP(host2.Addrs()[0])
udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) udpPort2, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3)
require.NoError(t, err) require.NoError(t, err)
l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
d2, err := NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) d2, err := NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
require.NoError(t, err) require.NoError(t, err)
// H3 // H3
host3, _, prvKey3 := createHost(t) host3, _, prvKey3 := createHost(t)
ip3, _ := extractIP(host3.Addrs()[0]) ip3, _ := extractIP(host3.Addrs()[0])
udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3) udpPort3, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3)
require.NoError(t, err) require.NoError(t, err)
l3, err := newLocalnode(prvKey3, ip3, udpPort3, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) l3, err := newLocalnode(prvKey3, ip3, udpPort3, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
d3, err := NewDiscoveryV5(host3, prvKey3, l3, utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) d3, err := NewDiscoveryV5(host3, prvKey3, l3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
require.NoError(t, err) require.NoError(t, err)
defer d1.Stop() defer d1.Stop()

View File

@ -26,24 +26,47 @@ func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error
return enode.NewLocalNode(db, priv), nil return enode.NewLocalNode(db, priv), nil
} }
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) error { func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, shouldAutoUpdate bool, log *zap.Logger) error {
localnode.SetFallbackUDP(udpPort) localnode.SetFallbackUDP(int(udpPort))
localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags)) localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
localnode.SetStaticIP(ipAddr.IP)
if udpPort > 0 && udpPort <= math.MaxUint16 { if udpPort > math.MaxUint16 {
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] return errors.New("invalid udp port number")
}
if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 {
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("setting tcpPort", zap.Int("port", ipAddr.Port))
} }
if advertiseAddr != nil { if advertiseAddr != nil {
// An advertised address disables libp2p address updates
// and discv5 predictions
localnode.SetStaticIP(*advertiseAddr) localnode.SetStaticIP(*advertiseAddr)
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6?
} else if !shouldAutoUpdate {
// We received a libp2p address update. Autoupdate is disabled
// Using a static ip will disable endpoint prediction.
localnode.SetStaticIP(ipAddr.IP)
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6?
} else {
// We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get
// updated automatically
ip4 := ipAddr.IP.To4()
ip6 := ipAddr.IP.To16()
if ip4 != nil && !ip4.IsUnspecified() {
localnode.Set(enr.IPv4(ip4))
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
} else {
localnode.Delete(enr.IPv4{})
localnode.Delete(enr.TCP(0))
}
if ip6 != nil && !ip6.IsUnspecified() {
localnode.Set(enr.IPv6(ip6))
localnode.Set(enr.TCP6(ipAddr.Port))
} else {
localnode.Delete(enr.IPv6{})
localnode.Delete(enr.TCP6(0))
}
} }
// Adding websocket multiaddresses // Adding websocket multiaddresses
@ -222,26 +245,10 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
return err return err
} }
// TODO: make this optional depending on DNS Disc being enabled err = w.updateLocalNode(w.localNode, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.opts.discV5autoUpdate, w.log)
if w.opts.privKey != nil && w.localNode != nil { if err != nil {
err := w.updateLocalNode(w.localNode, w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log) w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
if err != nil { return err
w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
return err
} else {
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
// Restarting DiscV5
discV5 := w.DiscV5()
if discV5 != nil && discV5.IsStarted() {
w.log.Info("restarting discv5")
w.discoveryV5.Stop()
err = w.discoveryV5.Start(ctx)
if err != nil {
w.log.Error("could not restart discv5", zap.Error(err))
return err
}
}
}
} }
return nil return nil

View File

@ -83,8 +83,6 @@ type WakuNode struct {
localNode *enode.LocalNode localNode *enode.LocalNode
addrChan chan ma.Multiaddr
bcaster v2.Broadcaster bcaster v2.Broadcaster
connectionNotif ConnectionNotifier connectionNotif ConnectionNotifier
@ -166,7 +164,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.opts = params w.opts = params
w.log = params.logger.Named("node2") w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{} w.wg = &sync.WaitGroup{}
w.addrChan = make(chan ma.Multiaddr, 1024)
w.keepAliveFails = make(map[peer.ID]int) w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay) w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay)
@ -231,14 +228,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
return w, nil return w, nil
} }
func (w *WakuNode) onAddrChange() { func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) {
for m := range w.addrChan {
_ = m
// TODO: determine if still needed. Otherwise remove
}
}
func (w *WakuNode) checkForAddressChanges(ctx context.Context) {
defer w.wg.Done() defer w.wg.Done()
addrs := w.ListenAddresses() addrs := w.ListenAddresses()
@ -247,7 +237,6 @@ func (w *WakuNode) checkForAddressChanges(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
close(w.addrChan)
return return
case <-first: case <-first:
w.log.Info("listening", logging.MultiAddrs("multiaddr", addrs...)) w.log.Info("listening", logging.MultiAddrs("multiaddr", addrs...))
@ -267,9 +256,6 @@ func (w *WakuNode) checkForAddressChanges(ctx context.Context) {
if diff { if diff {
addrs = newAddrs addrs = newAddrs
w.log.Info("listening addresses update received", logging.MultiAddrs("multiaddr", addrs...)) w.log.Info("listening addresses update received", logging.MultiAddrs("multiaddr", addrs...))
for _, addr := range addrs {
w.addrChan <- addr
}
_ = w.setupENR(ctx, addrs) _ = w.setupENR(ctx, addrs)
} }
} }
@ -284,10 +270,10 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log) w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log)
w.host.Network().Notify(w.connectionNotif) w.host.Network().Notify(w.connectionNotif)
w.wg.Add(2) w.wg.Add(3)
go w.connectednessListener(ctx) go w.connectednessListener(ctx)
go w.checkForAddressChanges(ctx) go w.watchMultiaddressChanges(ctx)
go w.onAddrChange() go w.watchENRChanges(ctx)
if w.opts.keepAliveInterval > time.Duration(0) { if w.opts.keepAliveInterval > time.Duration(0) {
w.wg.Add(1) w.wg.Add(1)
@ -408,6 +394,31 @@ func (w *WakuNode) ID() string {
return w.host.ID().Pretty() return w.host.ID().Pretty()
} }
func (w *WakuNode) watchENRChanges(ctx context.Context) {
defer w.wg.Done()
timer := time.NewTicker(1 * time.Second)
var prevNodeVal string
for {
select {
case <-ctx.Done():
return
case <-timer.C:
if w.localNode != nil {
currNodeVal := w.localNode.Node().String()
if prevNodeVal != currNodeVal {
if prevNodeVal == "" {
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
} else {
w.log.Info("new enr record", logging.ENode("enr", w.localNode.Node()))
}
prevNodeVal = currNodeVal
}
}
}
}
}
// ListenAddresses returns all the multiaddresses used by the host // ListenAddresses returns all the multiaddresses used by the host
func (w *WakuNode) ListenAddresses() []ma.Multiaddr { func (w *WakuNode) ListenAddresses() []ma.Multiaddr {
hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty())) hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty()))

View File

@ -75,7 +75,7 @@ type WakuNodeParameters struct {
swapPaymentThreshold int swapPaymentThreshold int
enableDiscV5 bool enableDiscV5 bool
udpPort int udpPort uint
discV5bootnodes []*enode.Node discV5bootnodes []*enode.Node
discV5Opts []pubsub.DiscoverOpt discV5Opts []pubsub.DiscoverOpt
discV5autoUpdate bool discV5autoUpdate bool
@ -282,7 +282,7 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option)
} }
// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery // WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery
func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableDiscV5 = true params.enableDiscV5 = true
params.udpPort = udpPort params.udpPort = udpPort

View File

@ -105,7 +105,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
ip1, _ := extractIP(host1.Addrs()[0]) ip1, _ := extractIP(host1.Addrs()[0])
l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), discv5.WithUDPPort(udpPort1)) d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1)))
require.NoError(t, err) require.NoError(t, err)
// H2 // H2
@ -115,7 +115,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), discv5.WithUDPPort(udpPort2), discv5.WithBootnodes([]*enode.Node{d1.Node()})) d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()}))
require.NoError(t, err) require.NoError(t, err)
// H3 // H3