mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-03 14:33:09 +00:00
feat: single point of localnode parametrization (#1297)
This commit is contained in:
parent
4b28d08451
commit
b0af7695bd
@ -6,173 +6,23 @@ import (
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
madns "github.com/multiformats/go-multiaddr-dns"
|
||||
"go.uber.org/zap"
|
||||
|
||||
ndoeutils "github.com/waku-org/go-waku/waku/v2/node/utils"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool) error {
|
||||
var options []wenr.ENROption
|
||||
options = append(options, wenr.WithUDPPort(udpPort))
|
||||
options = append(options, wenr.WithWakuBitfield(wakuFlags))
|
||||
|
||||
// Reset ENR fields
|
||||
wenr.DeleteField(localnode, wenr.MultiaddrENRField)
|
||||
wenr.DeleteField(localnode, enr.TCP(0).ENRKey())
|
||||
wenr.DeleteField(localnode, enr.IPv4{}.ENRKey())
|
||||
wenr.DeleteField(localnode, enr.IPv6{}.ENRKey())
|
||||
|
||||
if advertiseAddr != nil {
|
||||
// An advertised address disables libp2p address updates
|
||||
// and discv5 predictions
|
||||
ipAddr, err := selectMostExternalAddress(advertiseAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
options = append(options, wenr.WithIP(ipAddr))
|
||||
} else if !shouldAutoUpdate {
|
||||
// We received a libp2p address update. Autoupdate is disabled
|
||||
// Using a static ip will disable endpoint prediction.
|
||||
options = append(options, wenr.WithIP(ipAddr))
|
||||
} else {
|
||||
if ipAddr.Port != 0 {
|
||||
// We received a libp2p address update, but we should still
|
||||
// allow discv5 to update the enr record. We set the localnode
|
||||
// keys manually. It's possible that the ENR record might get
|
||||
// updated automatically
|
||||
ip4 := ipAddr.IP.To4()
|
||||
ip6 := ipAddr.IP.To16()
|
||||
if ip4 != nil && !ip4.IsUnspecified() {
|
||||
localnode.SetFallbackIP(ip4)
|
||||
localnode.Set(enr.IPv4(ip4))
|
||||
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
|
||||
} else {
|
||||
localnode.Delete(enr.IPv4{})
|
||||
localnode.Delete(enr.TCP(0))
|
||||
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
|
||||
}
|
||||
|
||||
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
|
||||
localnode.Set(enr.IPv6(ip6))
|
||||
localnode.Set(enr.TCP6(ipAddr.Port))
|
||||
} else {
|
||||
localnode.Delete(enr.IPv6{})
|
||||
localnode.Delete(enr.TCP6(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Writing the IP + Port has priority over writting the multiaddress which might fail or not
|
||||
// depending on the enr having space
|
||||
options = append(options, wenr.WithMultiaddress(multiaddrs...))
|
||||
|
||||
return wenr.Update(w.log, localnode, options...)
|
||||
}
|
||||
|
||||
func isPrivate(addr *net.TCPAddr) bool {
|
||||
return addr.IP.IsPrivate()
|
||||
}
|
||||
|
||||
func isExternal(addr *net.TCPAddr) bool {
|
||||
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
|
||||
}
|
||||
|
||||
func isLoopback(addr *net.TCPAddr) bool {
|
||||
return addr.IP.IsLoopback()
|
||||
}
|
||||
|
||||
func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
|
||||
for _, s := range ss {
|
||||
if fn(s) {
|
||||
ret = append(ret, s)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func extractIPAddressForENR(addr ma.Multiaddr) (*net.TCPAddr, error) {
|
||||
// It's a p2p-circuit address. We shouldnt use these
|
||||
// for building the ENR record default keys
|
||||
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
|
||||
if err == nil {
|
||||
return nil, errors.New("can't use IP address from a p2p-circuit address")
|
||||
}
|
||||
|
||||
// ws and wss addresses are handled by the multiaddr key
|
||||
// they shouldnt be used for building the ENR record default keys
|
||||
_, err = addr.ValueForProtocol(ma.P_WS)
|
||||
if err == nil {
|
||||
return nil, errors.New("can't use IP address from a ws address")
|
||||
}
|
||||
_, err = addr.ValueForProtocol(ma.P_WSS)
|
||||
if err == nil {
|
||||
return nil, errors.New("can't use IP address from a wss address")
|
||||
}
|
||||
|
||||
var ipStr string
|
||||
dns4, err := addr.ValueForProtocol(ma.P_DNS4)
|
||||
if err != nil {
|
||||
ipStr, err = addr.ValueForProtocol(ma.P_IP4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
netIP, err := net.ResolveIPAddr("ip4", dns4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ipStr = netIP.String()
|
||||
}
|
||||
|
||||
portStr, err := addr.ValueForProtocol(ma.P_TCP)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
port, err := strconv.Atoi(portStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &net.TCPAddr{
|
||||
IP: net.ParseIP(ipStr),
|
||||
Port: port,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func selectMostExternalAddress(addresses []ma.Multiaddr) (*net.TCPAddr, error) {
|
||||
var ipAddrs []*net.TCPAddr
|
||||
for _, addr := range addresses {
|
||||
ipAddr, err := extractIPAddressForENR(addr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
ipAddrs = append(ipAddrs, ipAddr)
|
||||
}
|
||||
|
||||
externalIPs := filterIP(ipAddrs, isExternal)
|
||||
if len(externalIPs) > 0 {
|
||||
return externalIPs[0], nil
|
||||
}
|
||||
|
||||
privateIPs := filterIP(ipAddrs, isPrivate)
|
||||
if len(privateIPs) > 0 {
|
||||
return privateIPs[0], nil
|
||||
}
|
||||
|
||||
loopback := filterIP(ipAddrs, isLoopback)
|
||||
if len(loopback) > 0 {
|
||||
return loopback[0], nil
|
||||
}
|
||||
|
||||
return nil, errors.New("could not obtain ip address")
|
||||
func (w *WakuNode) updateLocalNode() error {
|
||||
w.localNodeMutex.Lock()
|
||||
defer w.localNodeMutex.Unlock()
|
||||
return enr.UpdateLocalNode(w.log, w.localNode, &w.localNodeParams)
|
||||
}
|
||||
|
||||
func decapsulateP2P(addr ma.Multiaddr) (ma.Multiaddr, error) {
|
||||
@ -282,7 +132,7 @@ func filter0Port(addresses []ma.Multiaddr) ([]ma.Multiaddr, error) {
|
||||
}
|
||||
|
||||
func (w *WakuNode) getENRAddresses(ctx context.Context, addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr []ma.Multiaddr, err error) {
|
||||
extAddr, err = selectMostExternalAddress(addrs)
|
||||
extAddr, err = ndoeutils.SelectMostExternalAddress(addrs)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -320,7 +170,10 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
|
||||
return err
|
||||
}
|
||||
|
||||
err = w.updateLocalNode(w.localNode, multiaddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddrs, w.opts.discV5autoUpdate)
|
||||
w.localNodeParams.Multiaddrs = multiaddresses
|
||||
w.localNodeParams.IPAddr = ipAddr
|
||||
|
||||
err = w.updateLocalNode()
|
||||
if err != nil {
|
||||
w.log.Error("updating localnode ENR record", zap.Error(err))
|
||||
return err
|
||||
@ -340,7 +193,8 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
|
||||
}
|
||||
|
||||
func (w *WakuNode) SetRelayShards(rs protocol.RelayShards) error {
|
||||
err := wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs))
|
||||
w.localNodeParams.RelayShards = rs
|
||||
err := w.updateLocalNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -396,7 +250,8 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
|
||||
w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0]))
|
||||
}
|
||||
|
||||
err = wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs[0]))
|
||||
w.localNodeParams.RelayShards = rs[0]
|
||||
err = w.updateLocalNode()
|
||||
if err != nil {
|
||||
w.log.Warn("could not set ENR shard info", zap.Error(err))
|
||||
continue
|
||||
|
||||
106
waku/v2/node/utils/utils.go
Normal file
106
waku/v2/node/utils/utils.go
Normal file
@ -0,0 +1,106 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
func ExtractIPAddressForENR(addr multiaddr.Multiaddr) (*net.TCPAddr, error) {
|
||||
// It's a p2p-circuit address. We shouldnt use these
|
||||
// for building the ENR record default keys
|
||||
_, err := addr.ValueForProtocol(multiaddr.P_CIRCUIT)
|
||||
if err == nil {
|
||||
return nil, errors.New("can't use IP address from a p2p-circuit address")
|
||||
}
|
||||
|
||||
// ws and wss addresses are handled by the multiaddr key
|
||||
// they shouldnt be used for building the ENR record default keys
|
||||
_, err = addr.ValueForProtocol(multiaddr.P_WS)
|
||||
if err == nil {
|
||||
return nil, errors.New("can't use IP address from a ws address")
|
||||
}
|
||||
_, err = addr.ValueForProtocol(multiaddr.P_WSS)
|
||||
if err == nil {
|
||||
return nil, errors.New("can't use IP address from a wss address")
|
||||
}
|
||||
|
||||
var ipStr string
|
||||
dns4, err := addr.ValueForProtocol(multiaddr.P_DNS4)
|
||||
if err != nil {
|
||||
ipStr, err = addr.ValueForProtocol(multiaddr.P_IP4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
netIP, err := net.ResolveIPAddr("ip4", dns4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ipStr = netIP.String()
|
||||
}
|
||||
|
||||
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
port, err := strconv.Atoi(portStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &net.TCPAddr{
|
||||
IP: net.ParseIP(ipStr),
|
||||
Port: port,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func SelectMostExternalAddress(addresses []multiaddr.Multiaddr) (*net.TCPAddr, error) {
|
||||
var ipAddrs []*net.TCPAddr
|
||||
for _, addr := range addresses {
|
||||
ipAddr, err := ExtractIPAddressForENR(addr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
ipAddrs = append(ipAddrs, ipAddr)
|
||||
}
|
||||
|
||||
externalIPs := filterIP(ipAddrs, isExternal)
|
||||
if len(externalIPs) > 0 {
|
||||
return externalIPs[0], nil
|
||||
}
|
||||
|
||||
privateIPs := filterIP(ipAddrs, isPrivate)
|
||||
if len(privateIPs) > 0 {
|
||||
return privateIPs[0], nil
|
||||
}
|
||||
|
||||
loopback := filterIP(ipAddrs, isLoopback)
|
||||
if len(loopback) > 0 {
|
||||
return loopback[0], nil
|
||||
}
|
||||
|
||||
return nil, errors.New("could not obtain ip address")
|
||||
}
|
||||
|
||||
func isPrivate(addr *net.TCPAddr) bool {
|
||||
return addr.IP.IsPrivate()
|
||||
}
|
||||
|
||||
func isExternal(addr *net.TCPAddr) bool {
|
||||
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
|
||||
}
|
||||
|
||||
func isLoopback(addr *net.TCPAddr) bool {
|
||||
return addr.IP.IsLoopback()
|
||||
}
|
||||
|
||||
func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
|
||||
for _, s := range ss {
|
||||
if fn(s) {
|
||||
ret = append(ret, s)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -106,11 +106,20 @@ type WakuNode struct {
|
||||
store *store.WakuStore
|
||||
rlnRelay RLNRelay
|
||||
|
||||
wakuFlag enr.WakuEnrBitfield
|
||||
circuitRelayNodes chan peer.AddrInfo
|
||||
|
||||
localNode *enode.LocalNode
|
||||
|
||||
// localNodeParams are ENR parameters that will be applied to the localnode.
|
||||
localNodeParams enr.LocalNodeParams
|
||||
|
||||
// LocalNode.Set is a lazy operation that only stores the entry, but does not sign the record.
|
||||
// But the size of the record is only checked during `sign`, and if it's >300 bytes, it will panic.
|
||||
// In WithMultiaddress we attempt to write as much addresses as possible, relying on existing local node entries.
|
||||
// On the other hand, enr.WithWakuRelaySharding is called in a goroutine, so ther is a race condition.
|
||||
// To make it work properly, we should make sure that entries are not added during WithMultiaddress run.
|
||||
localNodeMutex *sync.Mutex
|
||||
|
||||
bcaster relay.Broadcaster
|
||||
|
||||
connectionNotif ConnectionNotifier
|
||||
@ -193,11 +202,18 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
||||
w.opts = params
|
||||
w.log = params.logger.Named("node2")
|
||||
w.wg = &sync.WaitGroup{}
|
||||
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay)
|
||||
w.circuitRelayNodes = make(chan peer.AddrInfo)
|
||||
w.metrics = newMetrics(params.prometheusReg)
|
||||
w.metrics.RecordVersion(Version, GitCommit)
|
||||
|
||||
w.localNodeMutex = &sync.Mutex{}
|
||||
w.localNodeParams = enr.LocalNodeParams{
|
||||
WakuFlags: enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay),
|
||||
UDPPort: w.opts.udpPort,
|
||||
AdvertiseAddr: w.opts.advertiseAddrs,
|
||||
ShouldAutoUpdate: w.opts.discV5autoUpdate,
|
||||
}
|
||||
|
||||
// Setup peerstore wrapper
|
||||
if params.peerstore != nil {
|
||||
w.peerstore = wps.NewWakuPeerstore(params.peerstore)
|
||||
|
||||
@ -1,15 +1,15 @@
|
||||
package enr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
gcrypto "github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestEnodeToMultiAddr(t *testing.T) {
|
||||
@ -22,75 +22,51 @@ func TestEnodeToMultiAddr(t *testing.T) {
|
||||
require.Equal(t, expectedMultiAddr, actualMultiAddr.String())
|
||||
}
|
||||
|
||||
// TODO: this function is duplicated in localnode.go. Remove duplication
|
||||
|
||||
func updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP, shouldAutoUpdate bool) error {
|
||||
var options []ENROption
|
||||
options = append(options, WithUDPPort(udpPort))
|
||||
options = append(options, WithWakuBitfield(wakuFlags))
|
||||
options = append(options, WithMultiaddress(multiaddrs...))
|
||||
|
||||
if advertiseAddr != nil {
|
||||
// An advertised address disables libp2p address updates
|
||||
// and discv5 predictions
|
||||
nip := &net.TCPAddr{
|
||||
IP: *advertiseAddr,
|
||||
Port: ipAddr.Port,
|
||||
}
|
||||
options = append(options, WithIP(nip))
|
||||
} else if !shouldAutoUpdate {
|
||||
// We received a libp2p address update. Autoupdate is disabled
|
||||
// Using a static ip will disable endpoint prediction.
|
||||
options = append(options, WithIP(ipAddr))
|
||||
} else {
|
||||
// We received a libp2p address update, but we should still
|
||||
// allow discv5 to update the enr record. We set the localnode
|
||||
// keys manually. It's possible that the ENR record might get
|
||||
// updated automatically
|
||||
ip4 := ipAddr.IP.To4()
|
||||
ip6 := ipAddr.IP.To16()
|
||||
if ip4 != nil && !ip4.IsUnspecified() {
|
||||
localnode.Set(enr.IPv4(ip4))
|
||||
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
|
||||
} else {
|
||||
localnode.Delete(enr.IPv4{})
|
||||
localnode.Delete(enr.TCP(0))
|
||||
}
|
||||
|
||||
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
|
||||
localnode.Set(enr.IPv6(ip6))
|
||||
localnode.Set(enr.TCP6(ipAddr.Port))
|
||||
} else {
|
||||
localnode.Delete(enr.IPv6{})
|
||||
localnode.Delete(enr.TCP6(0))
|
||||
}
|
||||
}
|
||||
|
||||
return Update(utils.Logger(), localnode, options...)
|
||||
}
|
||||
|
||||
func TestMultiaddr(t *testing.T) {
|
||||
logger, err := zap.NewDevelopment()
|
||||
require.NoError(t, err)
|
||||
|
||||
key, _ := gcrypto.GenerateKey()
|
||||
wakuFlag := NewWakuEnrBitfield(true, true, true, true)
|
||||
|
||||
//wss, _ := ma.NewMultiaddr("/dns4/www.somedomainname.com/tcp/443/wss")
|
||||
wss, _ := ma.NewMultiaddr("/dns4/www.somedomainname.com/tcp/443/wss")
|
||||
circuit1, _ := ma.NewMultiaddr("/dns4/node-02.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
|
||||
circuit2, _ := ma.NewMultiaddr("/dns4/node-01.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
|
||||
circuit3, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
|
||||
circuit4, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
|
||||
circuit5, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
|
||||
circuit6, _ := ma.NewMultiaddr("/dns4/node-03.gc-us-central1-a.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg/p2p-circuit")
|
||||
|
||||
multiaddrValues := []ma.Multiaddr{
|
||||
//wss,
|
||||
wss,
|
||||
circuit1,
|
||||
circuit2,
|
||||
circuit3,
|
||||
circuit4,
|
||||
circuit5,
|
||||
circuit6,
|
||||
}
|
||||
|
||||
db, _ := enode.OpenDB("")
|
||||
localNode := enode.NewLocalNode(db, key)
|
||||
err := updateLocalNode(localNode, multiaddrValues, &net.TCPAddr{IP: net.IPv4(192, 168, 1, 241), Port: 60000}, 50000, wakuFlag, nil, false)
|
||||
err = UpdateLocalNode(logger, localNode, &LocalNodeParams{
|
||||
Multiaddrs: multiaddrValues,
|
||||
IPAddr: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 241), Port: 60000},
|
||||
UDPPort: 50000,
|
||||
WakuFlags: wakuFlag,
|
||||
AdvertiseAddr: nil,
|
||||
ShouldAutoUpdate: false,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_ = localNode.Node() // Should not panic
|
||||
require.NotPanics(t, func() {
|
||||
_ = localNode.Node()
|
||||
})
|
||||
|
||||
_, _, err = Multiaddress(localNode.Node())
|
||||
peerID, maddrs, err := Multiaddress(localNode.Node())
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Println("peerID: ", peerID)
|
||||
fmt.Println("len maddrs: ", len(maddrs))
|
||||
fmt.Println("maddrs: ", maddrs)
|
||||
}
|
||||
|
||||
@ -13,6 +13,9 @@ import (
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/node/utils"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
)
|
||||
|
||||
func NewLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) {
|
||||
@ -27,49 +30,60 @@ type ENROption func(*enode.LocalNode) error
|
||||
|
||||
func WithMultiaddress(multiaddrs ...multiaddr.Multiaddr) ENROption {
|
||||
return func(localnode *enode.LocalNode) (err error) {
|
||||
if len(multiaddrs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Randomly shuffle multiaddresses
|
||||
rand.Shuffle(len(multiaddrs), func(i, j int) { multiaddrs[i], multiaddrs[j] = multiaddrs[j], multiaddrs[i] })
|
||||
|
||||
// Testing how many multiaddresses we can write before we exceed the limit
|
||||
// By simulating what the localnode does when signing the enr, but without
|
||||
// causing a panic
|
||||
|
||||
privk, err := crypto.GenerateKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Adding extra multiaddresses. Should probably not exceed the enr max size of 300bytes
|
||||
failedOnceWritingENR := false
|
||||
couldWriteENRatLeastOnce := false
|
||||
successIdx := -1
|
||||
for i := len(multiaddrs); i > 0; i-- {
|
||||
cpy := localnode.Node().Record() // Record() creates a copy for the current iteration
|
||||
cpy.Set(enr.WithEntry(MultiaddrENRField, marshalMultiaddress(multiaddrs[0:i])))
|
||||
cpy.SetSeq(localnode.Seq() + 1)
|
||||
err = enode.SignV4(cpy, privk)
|
||||
if err == nil {
|
||||
couldWriteENRatLeastOnce = true
|
||||
successIdx = i
|
||||
break
|
||||
}
|
||||
failedOnceWritingENR = true
|
||||
}
|
||||
|
||||
if failedOnceWritingENR && couldWriteENRatLeastOnce {
|
||||
// Could write a subset of multiaddresses but not all
|
||||
writeMultiaddressField(localnode, multiaddrs[0:successIdx])
|
||||
// Find the maximum number of multiaddresses that fit in the ENR size limit
|
||||
maxFittingCount := findMaxFittingMultiaddrs(localnode, multiaddrs)
|
||||
if maxFittingCount == 0 {
|
||||
return errors.New("no multiaddress fit into ENR")
|
||||
}
|
||||
|
||||
writeMultiaddressField(localnode, multiaddrs[0:maxFittingCount])
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithCapabilities(lightpush, filter, store, relay bool) ENROption {
|
||||
return func(localnode *enode.LocalNode) (err error) {
|
||||
wakuflags := NewWakuEnrBitfield(lightpush, filter, store, relay)
|
||||
return WithWakuBitfield(wakuflags)(localnode)
|
||||
// findMaxFittingMultiaddrs determines how many multiaddresses can fit in the ENR
|
||||
func findMaxFittingMultiaddrs(localnode *enode.LocalNode, multiaddrs []multiaddr.Multiaddr) int {
|
||||
privk, err := crypto.GenerateKey()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Get the current committed record (after the Node() call above)
|
||||
currentRecord := localnode.Node().Record()
|
||||
|
||||
// Binary search for optimal count
|
||||
maxFitting := 0
|
||||
|
||||
for i := len(multiaddrs); i > 0; i-- {
|
||||
if canFitMultiaddrsOnRecord(currentRecord, multiaddrs[0:i], privk, localnode.Seq()) {
|
||||
// Return as soon as we can fit most of the addresses
|
||||
return i
|
||||
}
|
||||
}
|
||||
|
||||
return maxFitting
|
||||
}
|
||||
|
||||
// canFitMultiaddrsOnRecord tests if multiaddresses can fit on a specific record.
|
||||
// ENR has a limit of 300 bytes. Later it will panic on signing, if the record is over the size limit.
|
||||
// By simulating what the localnode does when signing the enr, but without causing a panic.
|
||||
func canFitMultiaddrsOnRecord(baseRecord *enr.Record, addrs []multiaddr.Multiaddr, privk *ecdsa.PrivateKey, seq uint64) bool {
|
||||
// Create a copy of the base record
|
||||
testRecord := *baseRecord
|
||||
|
||||
// Add the multiaddress field
|
||||
testRecord.Set(enr.WithEntry(MultiaddrENRField, marshalMultiaddress(addrs)))
|
||||
testRecord.SetSeq(seq + 1)
|
||||
|
||||
// Try to sign - this will return an error if the record is too large
|
||||
return enode.SignV4(&testRecord, privk) == nil
|
||||
}
|
||||
|
||||
func WithWakuBitfield(flags WakuEnrBitfield) ENROption {
|
||||
@ -105,6 +119,8 @@ func WithUDPPort(udpPort uint) ENROption {
|
||||
}
|
||||
}
|
||||
|
||||
// Update applies the given ENR options to the localnode.
|
||||
// This function should only be called from UpdateLocalNode, to ensure the order of options applied.
|
||||
func Update(logger *zap.Logger, localnode *enode.LocalNode, enrOptions ...ENROption) error {
|
||||
for _, opt := range enrOptions {
|
||||
err := opt(localnode)
|
||||
@ -140,3 +156,76 @@ func writeMultiaddressField(localnode *enode.LocalNode, addrAggr []multiaddr.Mul
|
||||
func DeleteField(localnode *enode.LocalNode, field string) {
|
||||
localnode.Delete(enr.WithEntry(field, struct{}{}))
|
||||
}
|
||||
|
||||
type LocalNodeParams struct {
|
||||
Multiaddrs []multiaddr.Multiaddr
|
||||
IPAddr *net.TCPAddr
|
||||
UDPPort uint
|
||||
WakuFlags WakuEnrBitfield
|
||||
AdvertiseAddr []multiaddr.Multiaddr
|
||||
ShouldAutoUpdate bool
|
||||
RelayShards protocol.RelayShards
|
||||
}
|
||||
|
||||
func UpdateLocalNode(logger *zap.Logger, localnode *enode.LocalNode, params *LocalNodeParams) error {
|
||||
var options []ENROption
|
||||
options = append(options, WithUDPPort(params.UDPPort))
|
||||
options = append(options, WithWakuBitfield(params.WakuFlags))
|
||||
|
||||
// Reset ENR fields
|
||||
DeleteField(localnode, MultiaddrENRField)
|
||||
DeleteField(localnode, enr.TCP(0).ENRKey())
|
||||
DeleteField(localnode, enr.IPv4{}.ENRKey())
|
||||
DeleteField(localnode, enr.IPv6{}.ENRKey())
|
||||
DeleteField(localnode, ShardingBitVectorEnrField)
|
||||
DeleteField(localnode, ShardingIndicesListEnrField)
|
||||
|
||||
if params.AdvertiseAddr != nil {
|
||||
// An advertised address disables libp2p address updates
|
||||
// and discv5 predictions
|
||||
ipAddr, err := utils.SelectMostExternalAddress(params.AdvertiseAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
options = append(options, WithIP(ipAddr))
|
||||
} else if !params.ShouldAutoUpdate {
|
||||
// We received a libp2p address update. Autoupdate is disabled
|
||||
// Using a static ip will disable endpoint prediction.
|
||||
options = append(options, WithIP(params.IPAddr))
|
||||
} else {
|
||||
if params.IPAddr.Port != 0 {
|
||||
// We received a libp2p address update, but we should still
|
||||
// allow discv5 to update the enr record. We set the localnode
|
||||
// keys manually. It's possible that the ENR record might get
|
||||
// updated automatically
|
||||
ip4 := params.IPAddr.IP.To4()
|
||||
ip6 := params.IPAddr.IP.To16()
|
||||
if ip4 != nil && !ip4.IsUnspecified() {
|
||||
localnode.SetFallbackIP(ip4)
|
||||
localnode.Set(enr.IPv4(ip4))
|
||||
localnode.Set(enr.TCP(uint16(params.IPAddr.Port)))
|
||||
} else {
|
||||
localnode.Delete(enr.IPv4{})
|
||||
localnode.Delete(enr.TCP(0))
|
||||
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
|
||||
}
|
||||
|
||||
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
|
||||
localnode.Set(enr.IPv6(ip6))
|
||||
localnode.Set(enr.TCP6(params.IPAddr.Port))
|
||||
} else {
|
||||
localnode.Delete(enr.IPv6{})
|
||||
localnode.Delete(enr.TCP6(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
options = append(options, WithWakuRelaySharding(params.RelayShards))
|
||||
|
||||
// Writing the IP + Port has priority over writing the multiaddress which might fail or not
|
||||
// depending on the enr having space
|
||||
options = append(options, WithMultiaddress(params.Multiaddrs...))
|
||||
|
||||
return Update(logger, localnode, options...)
|
||||
}
|
||||
|
||||
@ -1,10 +1,9 @@
|
||||
package enr
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
)
|
||||
|
||||
@ -43,21 +42,6 @@ func WithWakuRelaySharding(rs protocol.RelayShards) ENROption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithWakuRelayShardingTopics(topics ...string) ENROption {
|
||||
return func(localnode *enode.LocalNode) error {
|
||||
rs, err := protocol.TopicsToRelayShards(topics...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(rs) != 1 {
|
||||
return errors.New("expected a single RelayShards")
|
||||
}
|
||||
|
||||
return WithWakuRelaySharding(rs[0])(localnode)
|
||||
}
|
||||
}
|
||||
|
||||
// ENR record accessors
|
||||
|
||||
func RelayShardList(record *enr.Record) (*protocol.RelayShards, error) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user