feat: single point localnode parametrization

This commit is contained in:
Igor Sirotin 2025-12-08 20:56:57 +00:00
parent 288f5c6259
commit 80c0b44561
No known key found for this signature in database
GPG Key ID: 0EABBCB40CB9AD4A
5 changed files with 249 additions and 218 deletions

View File

@ -6,173 +6,23 @@ import (
"net"
"strconv"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/event"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"go.uber.org/zap"
ndoeutils "github.com/waku-org/go-waku/waku/v2/node/utils"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool) error {
var options []wenr.ENROption
options = append(options, wenr.WithUDPPort(udpPort))
options = append(options, wenr.WithWakuBitfield(wakuFlags))
// Reset ENR fields
wenr.DeleteField(localnode, wenr.MultiaddrENRField)
wenr.DeleteField(localnode, enr.TCP(0).ENRKey())
wenr.DeleteField(localnode, enr.IPv4{}.ENRKey())
wenr.DeleteField(localnode, enr.IPv6{}.ENRKey())
if advertiseAddr != nil {
// An advertised address disables libp2p address updates
// and discv5 predictions
ipAddr, err := selectMostExternalAddress(advertiseAddr)
if err != nil {
return err
}
options = append(options, wenr.WithIP(ipAddr))
} else if !shouldAutoUpdate {
// We received a libp2p address update. Autoupdate is disabled
// Using a static ip will disable endpoint prediction.
options = append(options, wenr.WithIP(ipAddr))
} else {
if ipAddr.Port != 0 {
// We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get
// updated automatically
ip4 := ipAddr.IP.To4()
ip6 := ipAddr.IP.To16()
if ip4 != nil && !ip4.IsUnspecified() {
localnode.SetFallbackIP(ip4)
localnode.Set(enr.IPv4(ip4))
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
} else {
localnode.Delete(enr.IPv4{})
localnode.Delete(enr.TCP(0))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
}
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
localnode.Set(enr.IPv6(ip6))
localnode.Set(enr.TCP6(ipAddr.Port))
} else {
localnode.Delete(enr.IPv6{})
localnode.Delete(enr.TCP6(0))
}
}
}
// Writing the IP + Port has priority over writting the multiaddress which might fail or not
// depending on the enr having space
options = append(options, wenr.WithMultiaddress(multiaddrs...))
return wenr.Update(w.log, localnode, options...)
}
func isPrivate(addr *net.TCPAddr) bool {
return addr.IP.IsPrivate()
}
func isExternal(addr *net.TCPAddr) bool {
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
}
func isLoopback(addr *net.TCPAddr) bool {
return addr.IP.IsLoopback()
}
func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
for _, s := range ss {
if fn(s) {
ret = append(ret, s)
}
}
return
}
func extractIPAddressForENR(addr ma.Multiaddr) (*net.TCPAddr, error) {
// It's a p2p-circuit address. We shouldnt use these
// for building the ENR record default keys
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
if err == nil {
return nil, errors.New("can't use IP address from a p2p-circuit address")
}
// ws and wss addresses are handled by the multiaddr key
// they shouldnt be used for building the ENR record default keys
_, err = addr.ValueForProtocol(ma.P_WS)
if err == nil {
return nil, errors.New("can't use IP address from a ws address")
}
_, err = addr.ValueForProtocol(ma.P_WSS)
if err == nil {
return nil, errors.New("can't use IP address from a wss address")
}
var ipStr string
dns4, err := addr.ValueForProtocol(ma.P_DNS4)
if err != nil {
ipStr, err = addr.ValueForProtocol(ma.P_IP4)
if err != nil {
return nil, err
}
} else {
netIP, err := net.ResolveIPAddr("ip4", dns4)
if err != nil {
return nil, err
}
ipStr = netIP.String()
}
portStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}, nil
}
func selectMostExternalAddress(addresses []ma.Multiaddr) (*net.TCPAddr, error) {
var ipAddrs []*net.TCPAddr
for _, addr := range addresses {
ipAddr, err := extractIPAddressForENR(addr)
if err != nil {
continue
}
ipAddrs = append(ipAddrs, ipAddr)
}
externalIPs := filterIP(ipAddrs, isExternal)
if len(externalIPs) > 0 {
return externalIPs[0], nil
}
privateIPs := filterIP(ipAddrs, isPrivate)
if len(privateIPs) > 0 {
return privateIPs[0], nil
}
loopback := filterIP(ipAddrs, isLoopback)
if len(loopback) > 0 {
return loopback[0], nil
}
return nil, errors.New("could not obtain ip address")
func (w *WakuNode) updateLocalNode() error {
w.localNodeMutex.Lock()
defer w.localNodeMutex.Unlock()
return enr.UpdateLocalNode(w.log, w.localNode, &w.localNodeParams)
}
func decapsulateP2P(addr ma.Multiaddr) (ma.Multiaddr, error) {
@ -282,7 +132,7 @@ func filter0Port(addresses []ma.Multiaddr) ([]ma.Multiaddr, error) {
}
func (w *WakuNode) getENRAddresses(ctx context.Context, addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr []ma.Multiaddr, err error) {
extAddr, err = selectMostExternalAddress(addrs)
extAddr, err = ndoeutils.SelectMostExternalAddress(addrs)
if err != nil {
return nil, nil, err
}
@ -320,7 +170,10 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
return err
}
err = w.updateLocalNode(w.localNode, multiaddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddrs, w.opts.discV5autoUpdate)
w.localNodeParams.Multiaddrs = multiaddresses
w.localNodeParams.IPAddr = ipAddr
err = w.updateLocalNode()
if err != nil {
w.log.Error("updating localnode ENR record", zap.Error(err))
return err
@ -340,7 +193,8 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
}
func (w *WakuNode) SetRelayShards(rs protocol.RelayShards) error {
err := wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs))
w.localNodeParams.RelayShards = rs
err := w.updateLocalNode()
if err != nil {
return err
}
@ -396,7 +250,8 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0]))
}
err = wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs[0]))
w.localNodeParams.RelayShards = rs[0]
err = w.updateLocalNode()
if err != nil {
w.log.Warn("could not set ENR shard info", zap.Error(err))
continue

106
waku/v2/node/utils/utils.go Normal file
View File

@ -0,0 +1,106 @@
package utils
import (
"errors"
"net"
"strconv"
"github.com/multiformats/go-multiaddr"
)
func ExtractIPAddressForENR(addr multiaddr.Multiaddr) (*net.TCPAddr, error) {
// It's a p2p-circuit address. We shouldnt use these
// for building the ENR record default keys
_, err := addr.ValueForProtocol(multiaddr.P_CIRCUIT)
if err == nil {
return nil, errors.New("can't use IP address from a p2p-circuit address")
}
// ws and wss addresses are handled by the multiaddr key
// they shouldnt be used for building the ENR record default keys
_, err = addr.ValueForProtocol(multiaddr.P_WS)
if err == nil {
return nil, errors.New("can't use IP address from a ws address")
}
_, err = addr.ValueForProtocol(multiaddr.P_WSS)
if err == nil {
return nil, errors.New("can't use IP address from a wss address")
}
var ipStr string
dns4, err := addr.ValueForProtocol(multiaddr.P_DNS4)
if err != nil {
ipStr, err = addr.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
return nil, err
}
} else {
netIP, err := net.ResolveIPAddr("ip4", dns4)
if err != nil {
return nil, err
}
ipStr = netIP.String()
}
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}, nil
}
func SelectMostExternalAddress(addresses []multiaddr.Multiaddr) (*net.TCPAddr, error) {
var ipAddrs []*net.TCPAddr
for _, addr := range addresses {
ipAddr, err := ExtractIPAddressForENR(addr)
if err != nil {
continue
}
ipAddrs = append(ipAddrs, ipAddr)
}
externalIPs := filterIP(ipAddrs, isExternal)
if len(externalIPs) > 0 {
return externalIPs[0], nil
}
privateIPs := filterIP(ipAddrs, isPrivate)
if len(privateIPs) > 0 {
return privateIPs[0], nil
}
loopback := filterIP(ipAddrs, isLoopback)
if len(loopback) > 0 {
return loopback[0], nil
}
return nil, errors.New("could not obtain ip address")
}
func isPrivate(addr *net.TCPAddr) bool {
return addr.IP.IsPrivate()
}
func isExternal(addr *net.TCPAddr) bool {
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
}
func isLoopback(addr *net.TCPAddr) bool {
return addr.IP.IsLoopback()
}
func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
for _, s := range ss {
if fn(s) {
ret = append(ret, s)
}
}
return
}

View File

@ -106,11 +106,20 @@ type WakuNode struct {
store *store.WakuStore
rlnRelay RLNRelay
wakuFlag enr.WakuEnrBitfield
circuitRelayNodes chan peer.AddrInfo
localNode *enode.LocalNode
// localNodeParams are ENR parameters that will be applied to the localnode.
localNodeParams enr.LocalNodeParams
// LocalNode.Set is a lazy operation that only stores the entry, but does not sign the record.
// But the size of the record is only checked during `sign`, and if it's >300 bytes, it will panic.
// In WithMultiaddress we attempt to write as much addresses as possible, relying on existing local node entries.
// On the other hand, enr.WithWakuRelaySharding is called in a goroutine, so ther is a race condition.
// To make it work properly, we should make sure that entries are not added during WithMultiaddress run.
localNodeMutex *sync.Mutex
bcaster relay.Broadcaster
connectionNotif ConnectionNotifier
@ -193,11 +202,18 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.opts = params
w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{}
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay)
w.circuitRelayNodes = make(chan peer.AddrInfo)
w.metrics = newMetrics(params.prometheusReg)
w.metrics.RecordVersion(Version, GitCommit)
w.localNodeMutex = &sync.Mutex{}
w.localNodeParams = enr.LocalNodeParams{
WakuFlags: enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay),
UDPPort: w.opts.udpPort,
AdvertiseAddr: w.opts.advertiseAddrs,
ShouldAutoUpdate: w.opts.discV5autoUpdate,
}
// Setup peerstore wrapper
if params.peerstore != nil {
w.peerstore = wps.NewWakuPeerstore(params.peerstore)

View File

@ -1,15 +1,15 @@
package enr
import (
"fmt"
"net"
"testing"
gcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
func TestEnodeToMultiAddr(t *testing.T) {
@ -22,75 +22,51 @@ func TestEnodeToMultiAddr(t *testing.T) {
require.Equal(t, expectedMultiAddr, actualMultiAddr.String())
}
// TODO: this function is duplicated in localnode.go. Remove duplication
func updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP, shouldAutoUpdate bool) error {
var options []ENROption
options = append(options, WithUDPPort(udpPort))
options = append(options, WithWakuBitfield(wakuFlags))
options = append(options, WithMultiaddress(multiaddrs...))
if advertiseAddr != nil {
// An advertised address disables libp2p address updates
// and discv5 predictions
nip := &net.TCPAddr{
IP: *advertiseAddr,
Port: ipAddr.Port,
}
options = append(options, WithIP(nip))
} else if !shouldAutoUpdate {
// We received a libp2p address update. Autoupdate is disabled
// Using a static ip will disable endpoint prediction.
options = append(options, WithIP(ipAddr))
} else {
// We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get
// updated automatically
ip4 := ipAddr.IP.To4()
ip6 := ipAddr.IP.To16()
if ip4 != nil && !ip4.IsUnspecified() {
localnode.Set(enr.IPv4(ip4))
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
} else {
localnode.Delete(enr.IPv4{})
localnode.Delete(enr.TCP(0))
}
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
localnode.Set(enr.IPv6(ip6))
localnode.Set(enr.TCP6(ipAddr.Port))
} else {
localnode.Delete(enr.IPv6{})
localnode.Delete(enr.TCP6(0))
}
}
return Update(utils.Logger(), localnode, options...)
}
func TestMultiaddr(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
key, _ := gcrypto.GenerateKey()
wakuFlag := NewWakuEnrBitfield(true, true, true, true)
//wss, _ := ma.NewMultiaddr("/dns4/www.somedomainname.com/tcp/443/wss")
wss, _ := ma.NewMultiaddr("/dns4/www.somedomainname.com/tcp/443/wss")
circuit1, _ := ma.NewMultiaddr("/dns4/node-02.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit2, _ := ma.NewMultiaddr("/dns4/node-01.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit3, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit4, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit5, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
circuit6, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
multiaddrValues := []ma.Multiaddr{
//wss,
wss,
circuit1,
circuit2,
circuit3,
circuit4,
circuit5,
circuit6,
}
db, _ := enode.OpenDB("")
localNode := enode.NewLocalNode(db, key)
err := updateLocalNode(localNode, multiaddrValues, &net.TCPAddr{IP: net.IPv4(192, 168, 1, 241), Port: 60000}, 50000, wakuFlag, nil, false)
err = UpdateLocalNode(logger, localNode, &LocalNodeParams{
Multiaddrs: multiaddrValues,
IPAddr: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 241), Port: 60000},
UDPPort: 50000,
WakuFlags: wakuFlag,
AdvertiseAddr: nil,
ShouldAutoUpdate: false,
})
require.NoError(t, err)
_ = localNode.Node() // Should not panic
require.NotPanics(t, func() {
_ = localNode.Node()
})
_, _, err = Multiaddress(localNode.Node())
peerID, maddrs, err := Multiaddress(localNode.Node())
require.NoError(t, err)
fmt.Println("peerID: ", peerID)
fmt.Println("len maddrs: ", len(maddrs))
fmt.Println("maddrs: ", maddrs)
}

View File

@ -13,6 +13,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
"github.com/waku-org/go-waku/waku/v2/node/utils"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
func NewLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) {
@ -116,6 +119,8 @@ func WithUDPPort(udpPort uint) ENROption {
}
}
// Update applies the given ENR options to the localnode.
// This function should only be called from UpdateLocalNode, to ensure the order of options applied.
func Update(logger *zap.Logger, localnode *enode.LocalNode, enrOptions ...ENROption) error {
for _, opt := range enrOptions {
err := opt(localnode)
@ -151,3 +156,76 @@ func writeMultiaddressField(localnode *enode.LocalNode, addrAggr []multiaddr.Mul
func DeleteField(localnode *enode.LocalNode, field string) {
localnode.Delete(enr.WithEntry(field, struct{}{}))
}
type LocalNodeParams struct {
Multiaddrs []multiaddr.Multiaddr
IPAddr *net.TCPAddr
UDPPort uint
WakuFlags WakuEnrBitfield
AdvertiseAddr []multiaddr.Multiaddr
ShouldAutoUpdate bool
RelayShards protocol.RelayShards
}
func UpdateLocalNode(logger *zap.Logger, localnode *enode.LocalNode, params *LocalNodeParams) error {
var options []ENROption
options = append(options, WithUDPPort(params.UDPPort))
options = append(options, WithWakuBitfield(params.WakuFlags))
// Reset ENR fields
DeleteField(localnode, MultiaddrENRField)
DeleteField(localnode, enr.TCP(0).ENRKey())
DeleteField(localnode, enr.IPv4{}.ENRKey())
DeleteField(localnode, enr.IPv6{}.ENRKey())
DeleteField(localnode, ShardingBitVectorEnrField)
DeleteField(localnode, ShardingIndicesListEnrField)
if params.AdvertiseAddr != nil {
// An advertised address disables libp2p address updates
// and discv5 predictions
ipAddr, err := utils.SelectMostExternalAddress(params.AdvertiseAddr)
if err != nil {
return err
}
options = append(options, WithIP(ipAddr))
} else if !params.ShouldAutoUpdate {
// We received a libp2p address update. Autoupdate is disabled
// Using a static ip will disable endpoint prediction.
options = append(options, WithIP(params.IPAddr))
} else {
if params.IPAddr.Port != 0 {
// We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get
// updated automatically
ip4 := params.IPAddr.IP.To4()
ip6 := params.IPAddr.IP.To16()
if ip4 != nil && !ip4.IsUnspecified() {
localnode.SetFallbackIP(ip4)
localnode.Set(enr.IPv4(ip4))
localnode.Set(enr.TCP(uint16(params.IPAddr.Port)))
} else {
localnode.Delete(enr.IPv4{})
localnode.Delete(enr.TCP(0))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
}
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
localnode.Set(enr.IPv6(ip6))
localnode.Set(enr.TCP6(params.IPAddr.Port))
} else {
localnode.Delete(enr.IPv6{})
localnode.Delete(enr.TCP6(0))
}
}
}
options = append(options, WithWakuRelaySharding(params.RelayShards))
// Writing the IP + Port has priority over writing the multiaddress which might fail or not
// depending on the enr having space
options = append(options, WithMultiaddress(params.Multiaddrs...))
return Update(logger, localnode, options...)
}