fix: do not write tcp port 0 and remove 100 chars length limit for multiaddresses (#1129)

This commit is contained in:
richΛrd 2024-06-20 03:20:15 -04:00 committed by GitHub
parent 795322a196
commit 93331b483e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 108 additions and 48 deletions

View File

@ -66,15 +66,15 @@ vendor:
lint-install: lint-install:
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | \ curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | \
bash -s -- -b $(shell ${GOCMD} env GOPATH)/bin v1.52.2 bash -s -- -b $(shell ${GOCMD} env GOPATH)/bin v1.59.1
lint: lint:
@echo "lint" @echo "lint"
@golangci-lint run ./... --deadline=5m @golangci-lint run ./...
lint-full: lint-full:
@echo "lint" @echo "lint"
@golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m @golangci-lint run ./... --config=./.golangci.full.yaml
test-with-race: test-with-race:
${GOCMD} test -race -timeout 300s ./waku/... ./cmd/waku/server/... ${GOCMD} test -race -timeout 300s ./waku/... ./cmd/waku/server/...

View File

@ -2,13 +2,14 @@ package discv5
import ( import (
"context" "context"
"testing"
"time"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol" wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/service"
"go.uber.org/zap" "go.uber.org/zap"
"testing"
"time"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -255,7 +256,7 @@ func TestDiscV5WithShardFilter(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Update node with shard info // Update node with shard info
err = wenr.Update(l1, wenr.WithWakuRelaySharding(rs1[0])) err = wenr.Update(utils.Logger(), l1, wenr.WithWakuRelaySharding(rs1[0]))
require.NoError(t, err) require.NoError(t, err)
// H2 // H2
@ -271,7 +272,7 @@ func TestDiscV5WithShardFilter(t *testing.T) {
d2.SetHost(host2) d2.SetHost(host2)
// Update second node with shard info used for first node as well // Update second node with shard info used for first node as well
err = wenr.Update(l2, wenr.WithWakuRelaySharding(rs1[0])) err = wenr.Update(utils.Logger(), l2, wenr.WithWakuRelaySharding(rs1[0]))
require.NoError(t, err) require.NoError(t, err)
// H3 // H3

View File

@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/event"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
@ -20,7 +21,6 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M
var options []wenr.ENROption var options []wenr.ENROption
options = append(options, wenr.WithUDPPort(udpPort)) options = append(options, wenr.WithUDPPort(udpPort))
options = append(options, wenr.WithWakuBitfield(wakuFlags)) options = append(options, wenr.WithWakuBitfield(wakuFlags))
options = append(options, wenr.WithMultiaddress(multiaddrs...))
if advertiseAddr != nil { if advertiseAddr != nil {
// An advertised address disables libp2p address updates // An advertised address disables libp2p address updates
@ -36,6 +36,7 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M
// Using a static ip will disable endpoint prediction. // Using a static ip will disable endpoint prediction.
options = append(options, wenr.WithIP(ipAddr)) options = append(options, wenr.WithIP(ipAddr))
} else { } else {
if ipAddr.Port != 0 {
// We received a libp2p address update, but we should still // We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode // allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get // keys manually. It's possible that the ENR record might get
@ -60,8 +61,13 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M
localnode.Delete(enr.TCP6(0)) localnode.Delete(enr.TCP6(0))
} }
} }
}
return wenr.Update(localnode, options...) // Writing the IP + Port has priority over writting the multiaddress which might fail or not
// depending on the enr having space
options = append(options, wenr.WithMultiaddress(multiaddrs...))
return wenr.Update(w.log, localnode, options...)
} }
func isPrivate(addr *net.TCPAddr) bool { func isPrivate(addr *net.TCPAddr) bool {
@ -228,8 +234,28 @@ func selectCircuitRelayListenAddresses(addresses []ma.Multiaddr) ([]ma.Multiaddr
return result, nil return result, nil
} }
func (w *WakuNode) getENRAddresses(addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr []ma.Multiaddr, err error) { func filter0Port(addresses []ma.Multiaddr) ([]ma.Multiaddr, error) {
var result []ma.Multiaddr
for _, addr := range addresses {
portStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil && !errors.Is(err, multiaddr.ErrProtocolNotFound) {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
if port != 0 {
result = append(result, addr)
}
}
return result, nil
}
func (w *WakuNode) getENRAddresses(addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr []ma.Multiaddr, err error) {
extAddr, err = selectMostExternalAddress(addrs) extAddr, err = selectMostExternalAddress(addrs)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -253,6 +279,11 @@ func (w *WakuNode) getENRAddresses(addrs []ma.Multiaddr) (extAddr *net.TCPAddr,
multiaddr = append(multiaddr, wssAddrs...) multiaddr = append(multiaddr, wssAddrs...)
} }
multiaddr, err = filter0Port(multiaddr)
if err != nil {
return nil, nil, err
}
return return
} }
@ -323,7 +354,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0])) w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0]))
} }
err = wenr.Update(w.localNode, wenr.WithWakuRelaySharding(rs[0])) err = wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs[0]))
if err != nil { if err != nil {
w.log.Warn("could not set ENR shard info", zap.Error(err)) w.log.Warn("could not set ENR shard info", zap.Error(err))
continue continue

View File

@ -250,7 +250,7 @@ func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrF
rs, err := wakuproto.TopicsToRelayShards(topic) rs, err := wakuproto.TopicsToRelayShards(topic)
require.NoError(t, err) require.NoError(t, err)
err = wenr.Update(localNode, wenr.WithWakuRelaySharding(rs[0])) err = wenr.Update(utils.Logger(), localNode, wenr.WithWakuRelaySharding(rs[0]))
require.NoError(t, err) require.NoError(t, err)
pm := NewPeerManager(10, 20, nil, logger) pm := NewPeerManager(10, 20, nil, logger)
pm.SetHost(host) pm.SetHost(host)

View File

@ -12,6 +12,8 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
) )
var ErrNoPortAvailable = errors.New("port not available")
// WakuENRField is the name of the ENR field that contains information about which protocols are supported by the node // WakuENRField is the name of the ENR field that contains information about which protocols are supported by the node
const WakuENRField = "waku2" const WakuENRField = "waku2"
@ -59,6 +61,11 @@ func enodeToMultiAddr(node *enode.Node) (multiaddr.Multiaddr, error) {
ipType := "ip4" ipType := "ip4"
portNumber := node.TCP() portNumber := node.TCP()
if portNumber == 0 {
return nil, ErrNoPortAvailable
}
if utils.IsIPv6(node.IP().String()) { if utils.IsIPv6(node.IP().String()) {
ipType = "ip6" ipType = "ip6"
var port enr.TCP6 var port enr.TCP6
@ -83,9 +90,12 @@ func Multiaddress(node *enode.Node) (peer.ID, []multiaddr.Multiaddr, error) {
addr, err := enodeToMultiAddr(node) addr, err := enodeToMultiAddr(node)
if err != nil { if err != nil {
if !errors.Is(err, ErrNoPortAvailable) {
return "", nil, err return "", nil, err
} }
} else {
result = append(result, addr) result = append(result, addr)
}
var multiaddrRaw []byte var multiaddrRaw []byte
if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil { if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil {

View File

@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/utils"
) )
func TestEnodeToMultiAddr(t *testing.T) { func TestEnodeToMultiAddr(t *testing.T) {
@ -65,7 +66,7 @@ func updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAd
} }
} }
return Update(localnode, options...) return Update(utils.Logger(), localnode, options...)
} }
func TestMultiaddr(t *testing.T) { func TestMultiaddr(t *testing.T) {

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/enr"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
) )
func NewLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) { func NewLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) {
@ -71,6 +72,10 @@ func WithWakuBitfield(flags WakuEnrBitfield) ENROption {
func WithIP(ipAddr *net.TCPAddr) ENROption { func WithIP(ipAddr *net.TCPAddr) ENROption {
return func(localnode *enode.LocalNode) (err error) { return func(localnode *enode.LocalNode) (err error) {
if ipAddr.Port == 0 {
return ErrNoPortAvailable
}
localnode.SetStaticIP(ipAddr.IP) localnode.SetStaticIP(ipAddr.IP)
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6? localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6?
return nil return nil
@ -91,13 +96,17 @@ func WithUDPPort(udpPort uint) ENROption {
} }
} }
func Update(localnode *enode.LocalNode, enrOptions ...ENROption) error { func Update(logger *zap.Logger, localnode *enode.LocalNode, enrOptions ...ENROption) error {
for _, opt := range enrOptions { for _, opt := range enrOptions {
err := opt(localnode) err := opt(localnode)
if err != nil { if err != nil {
if errors.Is(err, ErrNoPortAvailable) {
logger.Warn("no tcp port available. ENR will not contain tcp key")
} else {
return err return err
} }
} }
}
return nil return nil
} }
@ -120,9 +129,7 @@ func writeMultiaddressField(localnode *enode.LocalNode, addrAggr []multiaddr.Mul
fieldRaw = append(fieldRaw, maRaw...) fieldRaw = append(fieldRaw, maRaw...)
} }
if len(fieldRaw) != 0 && len(fieldRaw) <= 100 { // Max length for multiaddr field before triggering the 300 bytes limit
localnode.Set(enr.WithEntry(MultiaddrENRField, fieldRaw)) localnode.Set(enr.WithEntry(MultiaddrENRField, fieldRaw))
}
// This is to trigger the signing record err due to exceeding 300bytes limit // This is to trigger the signing record err due to exceeding 300bytes limit
_ = localnode.Node() _ = localnode.Node()

View File

@ -19,8 +19,10 @@ type PeerData struct {
type CommonDiscoveryService struct { type CommonDiscoveryService struct {
commonService *CommonService commonService *CommonService
channel chan PeerData channel chan PeerData
canWriteToChannel sync.Mutex channelIsClosed bool
channelMutex sync.Mutex
} }
func NewCommonDiscoveryService() *CommonDiscoveryService { func NewCommonDiscoveryService() *CommonDiscoveryService {
@ -33,7 +35,10 @@ func (sp *CommonDiscoveryService) Start(ctx context.Context, fn func() error) er
return sp.commonService.Start(ctx, func() error { return sp.commonService.Start(ctx, func() error {
// currently is used in discv5,peerConnector,rendevzous for returning new discovered Peers to peerConnector for connecting with them // currently is used in discv5,peerConnector,rendevzous for returning new discovered Peers to peerConnector for connecting with them
// mutex protection for this operation // mutex protection for this operation
sp.channelMutex.Lock()
sp.channel = make(chan PeerData) sp.channel = make(chan PeerData)
sp.channelIsClosed = false
sp.channelMutex.Unlock()
return fn() return fn()
}) })
} }
@ -43,9 +48,10 @@ func (sp *CommonDiscoveryService) Stop(stopFn func()) {
stopFn() stopFn()
sp.WaitGroup().Wait() // waitgroup is waited here so that channel can be closed after all the go rountines have stopped in service. sp.WaitGroup().Wait() // waitgroup is waited here so that channel can be closed after all the go rountines have stopped in service.
// there is a wait in the CommonService too // there is a wait in the CommonService too
sp.canWriteToChannel.Lock() sp.channelMutex.Lock()
close(sp.channel) close(sp.channel)
sp.canWriteToChannel.Unlock() sp.channelIsClosed = true
sp.channelMutex.Unlock()
}) })
} }
func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData { func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData {
@ -56,8 +62,12 @@ func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool {
return false return false
} }
sp.canWriteToChannel.Lock() sp.channelMutex.Lock()
defer sp.canWriteToChannel.Unlock() defer sp.channelMutex.Unlock()
if sp.channelIsClosed {
return false
}
select { select {
case sp.channel <- data: case sp.channel <- data: