From 93331b483e2f5a599c1e9d31691151bb39422222 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Thu, 20 Jun 2024 03:20:15 -0400 Subject: [PATCH] fix: do not write tcp port 0 and remove 100 chars length limit for multiaddresses (#1129) --- Makefile | 6 +- waku/v2/discv5/discover_test.go | 9 ++- waku/v2/node/localnode.go | 81 ++++++++++++++------- waku/v2/peermanager/peer_manager_test.go | 2 +- waku/v2/protocol/enr/enr.go | 14 +++- waku/v2/protocol/enr/enr_test.go | 3 +- waku/v2/protocol/enr/localnode.go | 17 +++-- waku/v2/service/common_discovery_service.go | 24 ++++-- 8 files changed, 108 insertions(+), 48 deletions(-) diff --git a/Makefile b/Makefile index 1a7bde82..03f00c5e 100644 --- a/Makefile +++ b/Makefile @@ -66,15 +66,15 @@ vendor: lint-install: curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | \ - bash -s -- -b $(shell ${GOCMD} env GOPATH)/bin v1.52.2 + bash -s -- -b $(shell ${GOCMD} env GOPATH)/bin v1.59.1 lint: @echo "lint" - @golangci-lint run ./... --deadline=5m + @golangci-lint run ./... lint-full: @echo "lint" - @golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m + @golangci-lint run ./... --config=./.golangci.full.yaml test-with-race: ${GOCMD} test -race -timeout 300s ./waku/... ./cmd/waku/server/... diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index f7fc461c..8a23ae69 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -2,13 +2,14 @@ package discv5 import ( "context" + "testing" + "time" + dto "github.com/prometheus/client_model/go" wps "github.com/waku-org/go-waku/waku/v2/peerstore" wakuproto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" - "testing" - "time" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/prometheus/client_golang/prometheus" @@ -255,7 +256,7 @@ func TestDiscV5WithShardFilter(t *testing.T) { require.NoError(t, err) // Update node with shard info - err = wenr.Update(l1, wenr.WithWakuRelaySharding(rs1[0])) + err = wenr.Update(utils.Logger(), l1, wenr.WithWakuRelaySharding(rs1[0])) require.NoError(t, err) // H2 @@ -271,7 +272,7 @@ func TestDiscV5WithShardFilter(t *testing.T) { d2.SetHost(host2) // Update second node with shard info used for first node as well - err = wenr.Update(l2, wenr.WithWakuRelaySharding(rs1[0])) + err = wenr.Update(utils.Logger(), l2, wenr.WithWakuRelaySharding(rs1[0])) require.NoError(t, err) // H3 diff --git a/waku/v2/node/localnode.go b/waku/v2/node/localnode.go index 1d12d535..163c46d2 100644 --- a/waku/v2/node/localnode.go +++ b/waku/v2/node/localnode.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/event" + "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" @@ -20,7 +21,6 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M var options []wenr.ENROption options = append(options, wenr.WithUDPPort(udpPort)) options = append(options, wenr.WithWakuBitfield(wakuFlags)) - options = append(options, wenr.WithMultiaddress(multiaddrs...)) if advertiseAddr != nil { // An advertised address disables libp2p address updates @@ -36,32 +36,38 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M // Using a static ip will disable endpoint prediction. options = append(options, wenr.WithIP(ipAddr)) } else { - // We received a libp2p address update, but we should still - // allow discv5 to update the enr record. We set the localnode - // keys manually. It's possible that the ENR record might get - // updated automatically - ip4 := ipAddr.IP.To4() - ip6 := ipAddr.IP.To16() - if ip4 != nil && !ip4.IsUnspecified() { - localnode.SetFallbackIP(ip4) - localnode.Set(enr.IPv4(ip4)) - localnode.Set(enr.TCP(uint16(ipAddr.Port))) - } else { - localnode.Delete(enr.IPv4{}) - localnode.Delete(enr.TCP(0)) - localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) - } + if ipAddr.Port != 0 { + // We received a libp2p address update, but we should still + // allow discv5 to update the enr record. We set the localnode + // keys manually. It's possible that the ENR record might get + // updated automatically + ip4 := ipAddr.IP.To4() + ip6 := ipAddr.IP.To16() + if ip4 != nil && !ip4.IsUnspecified() { + localnode.SetFallbackIP(ip4) + localnode.Set(enr.IPv4(ip4)) + localnode.Set(enr.TCP(uint16(ipAddr.Port))) + } else { + localnode.Delete(enr.IPv4{}) + localnode.Delete(enr.TCP(0)) + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + } - if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() { - localnode.Set(enr.IPv6(ip6)) - localnode.Set(enr.TCP6(ipAddr.Port)) - } else { - localnode.Delete(enr.IPv6{}) - localnode.Delete(enr.TCP6(0)) + if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() { + localnode.Set(enr.IPv6(ip6)) + localnode.Set(enr.TCP6(ipAddr.Port)) + } else { + localnode.Delete(enr.IPv6{}) + localnode.Delete(enr.TCP6(0)) + } } } - return wenr.Update(localnode, options...) + // Writing the IP + Port has priority over writting the multiaddress which might fail or not + // depending on the enr having space + options = append(options, wenr.WithMultiaddress(multiaddrs...)) + + return wenr.Update(w.log, localnode, options...) } func isPrivate(addr *net.TCPAddr) bool { @@ -228,8 +234,28 @@ func selectCircuitRelayListenAddresses(addresses []ma.Multiaddr) ([]ma.Multiaddr return result, nil } -func (w *WakuNode) getENRAddresses(addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr []ma.Multiaddr, err error) { +func filter0Port(addresses []ma.Multiaddr) ([]ma.Multiaddr, error) { + var result []ma.Multiaddr + for _, addr := range addresses { + portStr, err := addr.ValueForProtocol(ma.P_TCP) + if err != nil && !errors.Is(err, multiaddr.ErrProtocolNotFound) { + return nil, err + } + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, err + } + + if port != 0 { + result = append(result, addr) + } + } + + return result, nil +} + +func (w *WakuNode) getENRAddresses(addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr []ma.Multiaddr, err error) { extAddr, err = selectMostExternalAddress(addrs) if err != nil { return nil, nil, err @@ -253,6 +279,11 @@ func (w *WakuNode) getENRAddresses(addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr = append(multiaddr, wssAddrs...) } + multiaddr, err = filter0Port(multiaddr) + if err != nil { + return nil, nil, err + } + return } @@ -323,7 +354,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error { w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0])) } - err = wenr.Update(w.localNode, wenr.WithWakuRelaySharding(rs[0])) + err = wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs[0])) if err != nil { w.log.Warn("could not set ENR shard info", zap.Error(err)) continue diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 60fa2d0f..c63efc0d 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -250,7 +250,7 @@ func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrF rs, err := wakuproto.TopicsToRelayShards(topic) require.NoError(t, err) - err = wenr.Update(localNode, wenr.WithWakuRelaySharding(rs[0])) + err = wenr.Update(utils.Logger(), localNode, wenr.WithWakuRelaySharding(rs[0])) require.NoError(t, err) pm := NewPeerManager(10, 20, nil, logger) pm.SetHost(host) diff --git a/waku/v2/protocol/enr/enr.go b/waku/v2/protocol/enr/enr.go index b5b7bbd9..1ff1265c 100644 --- a/waku/v2/protocol/enr/enr.go +++ b/waku/v2/protocol/enr/enr.go @@ -12,6 +12,8 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) +var ErrNoPortAvailable = errors.New("port not available") + // WakuENRField is the name of the ENR field that contains information about which protocols are supported by the node const WakuENRField = "waku2" @@ -59,6 +61,11 @@ func enodeToMultiAddr(node *enode.Node) (multiaddr.Multiaddr, error) { ipType := "ip4" portNumber := node.TCP() + + if portNumber == 0 { + return nil, ErrNoPortAvailable + } + if utils.IsIPv6(node.IP().String()) { ipType = "ip6" var port enr.TCP6 @@ -83,9 +90,12 @@ func Multiaddress(node *enode.Node) (peer.ID, []multiaddr.Multiaddr, error) { addr, err := enodeToMultiAddr(node) if err != nil { - return "", nil, err + if !errors.Is(err, ErrNoPortAvailable) { + return "", nil, err + } + } else { + result = append(result, addr) } - result = append(result, addr) var multiaddrRaw []byte if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil { diff --git a/waku/v2/protocol/enr/enr_test.go b/waku/v2/protocol/enr/enr_test.go index 0768fe82..83ec0f99 100644 --- a/waku/v2/protocol/enr/enr_test.go +++ b/waku/v2/protocol/enr/enr_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enr" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/utils" ) func TestEnodeToMultiAddr(t *testing.T) { @@ -65,7 +66,7 @@ func updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAd } } - return Update(localnode, options...) + return Update(utils.Logger(), localnode, options...) } func TestMultiaddr(t *testing.T) { diff --git a/waku/v2/protocol/enr/localnode.go b/waku/v2/protocol/enr/localnode.go index a297d54c..a1896e41 100644 --- a/waku/v2/protocol/enr/localnode.go +++ b/waku/v2/protocol/enr/localnode.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/multiformats/go-multiaddr" + "go.uber.org/zap" ) func NewLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) { @@ -71,6 +72,10 @@ func WithWakuBitfield(flags WakuEnrBitfield) ENROption { func WithIP(ipAddr *net.TCPAddr) ENROption { return func(localnode *enode.LocalNode) (err error) { + if ipAddr.Port == 0 { + return ErrNoPortAvailable + } + localnode.SetStaticIP(ipAddr.IP) localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6? return nil @@ -91,11 +96,15 @@ func WithUDPPort(udpPort uint) ENROption { } } -func Update(localnode *enode.LocalNode, enrOptions ...ENROption) error { +func Update(logger *zap.Logger, localnode *enode.LocalNode, enrOptions ...ENROption) error { for _, opt := range enrOptions { err := opt(localnode) if err != nil { - return err + if errors.Is(err, ErrNoPortAvailable) { + logger.Warn("no tcp port available. ENR will not contain tcp key") + } else { + return err + } } } return nil @@ -120,9 +129,7 @@ func writeMultiaddressField(localnode *enode.LocalNode, addrAggr []multiaddr.Mul fieldRaw = append(fieldRaw, maRaw...) } - if len(fieldRaw) != 0 && len(fieldRaw) <= 100 { // Max length for multiaddr field before triggering the 300 bytes limit - localnode.Set(enr.WithEntry(MultiaddrENRField, fieldRaw)) - } + localnode.Set(enr.WithEntry(MultiaddrENRField, fieldRaw)) // This is to trigger the signing record err due to exceeding 300bytes limit _ = localnode.Node() diff --git a/waku/v2/service/common_discovery_service.go b/waku/v2/service/common_discovery_service.go index c5c12b64..0aa6b852 100644 --- a/waku/v2/service/common_discovery_service.go +++ b/waku/v2/service/common_discovery_service.go @@ -18,9 +18,11 @@ type PeerData struct { } type CommonDiscoveryService struct { - commonService *CommonService - channel chan PeerData - canWriteToChannel sync.Mutex + commonService *CommonService + + channel chan PeerData + channelIsClosed bool + channelMutex sync.Mutex } func NewCommonDiscoveryService() *CommonDiscoveryService { @@ -33,7 +35,10 @@ func (sp *CommonDiscoveryService) Start(ctx context.Context, fn func() error) er return sp.commonService.Start(ctx, func() error { // currently is used in discv5,peerConnector,rendevzous for returning new discovered Peers to peerConnector for connecting with them // mutex protection for this operation + sp.channelMutex.Lock() sp.channel = make(chan PeerData) + sp.channelIsClosed = false + sp.channelMutex.Unlock() return fn() }) } @@ -43,9 +48,10 @@ func (sp *CommonDiscoveryService) Stop(stopFn func()) { stopFn() sp.WaitGroup().Wait() // waitgroup is waited here so that channel can be closed after all the go rountines have stopped in service. // there is a wait in the CommonService too - sp.canWriteToChannel.Lock() + sp.channelMutex.Lock() close(sp.channel) - sp.canWriteToChannel.Unlock() + sp.channelIsClosed = true + sp.channelMutex.Unlock() }) } func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData { @@ -56,8 +62,12 @@ func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool { return false } - sp.canWriteToChannel.Lock() - defer sp.canWriteToChannel.Unlock() + sp.channelMutex.Lock() + defer sp.channelMutex.Unlock() + + if sp.channelIsClosed { + return false + } select { case sp.channel <- data: