fix: close connections when maximum number of peers is reached

This commit is contained in:
Richard Ramos 2023-01-16 18:11:56 -04:00 committed by RichΛrd
parent 7de1753549
commit 0b6eddf67d
8 changed files with 212 additions and 146 deletions

View File

@ -1 +1 @@
0.122.3 0.123.0

2
go.mod
View File

@ -77,7 +77,7 @@ require (
github.com/gorilla/sessions v1.2.1 github.com/gorilla/sessions v1.2.1
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8 github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
github.com/rmg/iso4217 v1.0.0 github.com/rmg/iso4217 v1.0.0
github.com/waku-org/go-waku v0.3.2-0.20230116133230-a4230afc62fb github.com/waku-org/go-waku v0.3.2-0.20230117214048-e0ccdbe9665d
) )
require ( require (

4
go.sum
View File

@ -2067,8 +2067,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-waku v0.3.2-0.20230116133230-a4230afc62fb h1:66ae/38EApilmfSMMx49ySuM5T1LCNEbra4LTRbDPw0= github.com/waku-org/go-waku v0.3.2-0.20230117214048-e0ccdbe9665d h1:W+I90Y9avF506cmqr4jZX+HeKf5+MY8SMQRZhMR/k9g=
github.com/waku-org/go-waku v0.3.2-0.20230116133230-a4230afc62fb/go.mod h1:sI14mN/sM8inIb2x2b462wydSEFyOyuDKI1cjiVIIpM= github.com/waku-org/go-waku v0.3.2-0.20230117214048-e0ccdbe9665d/go.mod h1:sI14mN/sM8inIb2x2b462wydSEFyOyuDKI1cjiVIIpM=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg h1:2vVIBCtBih2w1K9ll8YnToTDZvbxcgbsClsPlJS/kkg= github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg h1:2vVIBCtBih2w1K9ll8YnToTDZvbxcgbsClsPlJS/kkg=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg/go.mod h1:GlyaVeEWNEBxVJrWC6jFTvb4LNb9d9qnjdS6EiWVUvk= github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg/go.mod h1:GlyaVeEWNEBxVJrWC6jFTvb4LNb9d9qnjdS6EiWVUvk=
github.com/wealdtech/go-ens/v3 v3.5.0 h1:Huc9GxBgiGweCOGTYomvsg07K2QggAqZpZ5SuiZdC8o= github.com/wealdtech/go-ens/v3 v3.5.0 h1:Huc9GxBgiGweCOGTYomvsg07K2QggAqZpZ5SuiZdC8o=

View File

@ -20,16 +20,25 @@ import (
// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already // PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct { type PeerConnectionStrategy struct {
cache *lru.TwoQueueCache sync.RWMutex
host host.Host
cancel context.CancelFunc cache *lru.TwoQueueCache
host host.Host
cancel context.CancelFunc
paused bool
workerCtx context.Context
workerCancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
minPeers int minPeers int
dialTimeout time.Duration dialTimeout time.Duration
peerCh chan peer.AddrInfo peerCh chan peer.AddrInfo
backoff backoff.BackoffFactory dialCh chan peer.AddrInfo
mux sync.Mutex
logger *zap.Logger backoff backoff.BackoffFactory
mux sync.Mutex
logger *zap.Logger
} }
// NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already. // NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already.
@ -51,7 +60,6 @@ func NewPeerConnectionStrategy(h host.Host, cacheSize int, minPeers int, dialTim
dialTimeout: dialTimeout, dialTimeout: dialTimeout,
backoff: backoff, backoff: backoff,
logger: logger.Named("discovery-connector"), logger: logger.Named("discovery-connector"),
peerCh: make(chan peer.AddrInfo),
}, nil }, nil
} }
@ -73,8 +81,12 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel c.cancel = cancel
c.peerCh = make(chan peer.AddrInfo)
c.dialCh = make(chan peer.AddrInfo)
c.wg.Add(1) c.wg.Add(3)
go c.shouldDialPeers(ctx)
go c.workPublisher(ctx)
go c.dialPeers(ctx) go c.dialPeers(ctx)
return nil return nil
@ -87,6 +99,85 @@ func (c *PeerConnectionStrategy) Stop() {
c.cancel() c.cancel()
c.wg.Wait() c.wg.Wait()
close(c.peerCh)
close(c.dialCh)
}
func (c *PeerConnectionStrategy) isPaused() bool {
c.RLock()
defer c.RUnlock()
return c.paused
}
func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
defer c.wg.Done()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
c.Lock()
c.workerCtx, c.workerCancel = context.WithCancel(ctx)
c.Unlock()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
isPaused := c.isPaused()
numPeers := len(c.host.Network().Peers())
if numPeers >= c.minPeers && !isPaused {
c.Lock()
c.paused = true
c.workerCancel()
c.Unlock()
} else if numPeers < c.minPeers && isPaused {
c.Lock()
c.paused = false
c.workerCtx, c.workerCancel = context.WithCancel(ctx)
c.Unlock()
}
}
}
}
func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) {
select {
case c.dialCh <- p:
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
return
}
}
func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {
defer c.wg.Done()
for {
select {
case <-ctx.Done():
return
default:
isPaused := c.isPaused()
if !isPaused {
select {
case <-ctx.Done():
return
case p := <-c.peerCh:
c.publishWork(ctx, p)
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break
}
} else {
// Check if paused again
time.Sleep(1 * time.Second)
}
}
}
} }
func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
@ -100,7 +191,7 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
sem := make(chan struct{}, maxGoRoutines) sem := make(chan struct{}, maxGoRoutines)
for { for {
select { select {
case pi, ok := <-c.peerCh: case pi, ok := <-c.dialCh:
if !ok { if !ok {
return return
} }
@ -133,34 +224,19 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
} }
sem <- struct{}{} sem <- struct{}{}
c.wg.Add(1)
go func(pi peer.AddrInfo) { go func(pi peer.AddrInfo) {
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout) defer c.wg.Done()
defer cancel()
ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
defer cancel()
err := c.host.Connect(ctx, pi) err := c.host.Connect(ctx, pi)
if err != nil { if err != nil && !errors.Is(err, context.Canceled) {
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
} }
<-sem <-sem
}(pi) }(pi)
ticker := time.NewTicker(1 * time.Second)
peerCntLoop:
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if len(c.host.Network().Peers()) < c.minPeers {
ticker.Stop()
break peerCntLoop
}
}
}
case <-ctx.Done(): case <-ctx.Done():
if ctx.Err() != nil {
c.logger.Info("discovery: backoff connector context error", zap.Error(ctx.Err()))
}
return return
} }
} }

View File

@ -251,7 +251,7 @@ func (d *DiscoveryV5) Iterator() (enode.Iterator, error) {
func (d *DiscoveryV5) iterate(ctx context.Context) { func (d *DiscoveryV5) iterate(ctx context.Context) {
iterator, err := d.Iterator() iterator, err := d.Iterator()
if err != nil { if err != nil {
d.log.Error("obtaining iterator", zap.Error(err)) d.log.Debug("obtaining iterator", zap.Error(err))
return return
} }
@ -280,7 +280,11 @@ func (d *DiscoveryV5) iterate(ctx context.Context) {
} }
if len(peerAddrs) != 0 { if len(peerAddrs) != 0 {
d.peerConnector.PeerChannel() <- peerAddrs[0] select {
case <-ctx.Done():
return
case d.peerConnector.PeerChannel() <- peerAddrs[0]:
}
} }
} }
} }

View File

@ -90,9 +90,8 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx)) wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx))
wakuPX.log.Info("Peer exchange protocol started") wakuPX.log.Info("Peer exchange protocol started")
wakuPX.wg.Add(2) wakuPX.wg.Add(1)
go wakuPX.runPeerExchangeDiscv5Loop(ctx) go wakuPX.runPeerExchangeDiscv5Loop(ctx)
go wakuPX.handleNewPeers(ctx)
return nil return nil
} }
@ -129,7 +128,11 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
go func() { go func() {
defer wakuPX.wg.Done() defer wakuPX.wg.Done()
for _, p := range peers { for _, p := range peers {
wakuPX.peerCh <- p select {
case <-ctx.Done():
return
case wakuPX.peerConnector.PeerChannel() <- p:
}
} }
}() }()
} }
@ -137,19 +140,6 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
return nil return nil
} }
func (wakuPX *WakuPeerExchange) handleNewPeers(ctx context.Context) {
defer wakuPX.wg.Done()
for {
select {
case <-ctx.Done():
return
case p := <-wakuPX.peerCh:
wakuPX.peerConnector.PeerChannel() <- p
}
}
}
func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.Stream) { func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) { return func(s network.Stream) {
defer s.Close() defer s.Close()
@ -324,7 +314,7 @@ func (wakuPX *WakuPeerExchange) cleanCache() {
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) { func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) {
iterator, err := wakuPX.disc.Iterator() iterator, err := wakuPX.disc.Iterator()
if err != nil { if err != nil {
wakuPX.log.Error("obtaining iterator", zap.Error(err)) wakuPX.log.Debug("obtaining iterator", zap.Error(err))
return return
} }
defer iterator.Close() defer iterator.Close()

View File

@ -13,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
) )
// WakuENRField is the name of the ENR field that contains information about which protocols are supported by the node // WakuENRField is the name of the ENR field that contains information about which protocols are supported by the node
@ -50,95 +49,92 @@ func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield {
} }
// GetENRandIP returns a enr Node and TCP address obtained from a multiaddress. priv key and protocols supported // GetENRandIP returns a enr Node and TCP address obtained from a multiaddress. priv key and protocols supported
func GetENRandIP(addr multiaddr.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.PrivateKey) (*enode.Node, *net.TCPAddr, error) { func GetENRandIP(addrs []multiaddr.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.PrivateKey) (*enode.Node, error) {
var ip string
dns4, err := addr.ValueForProtocol(multiaddr.P_DNS4)
if err != nil {
ip, err = addr.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
return nil, nil, err
}
} else {
netIP, err := net.ResolveIPAddr("ip4", dns4)
if err != nil {
return nil, nil, err
}
ip = netIP.String()
}
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
return nil, nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, nil, err
}
tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port))
if err != nil {
return nil, nil, err
}
r := &enr.Record{} r := &enr.Record{}
for _, addr := range addrs {
storeInMultiaddrsKey := false
var multiaddrItems []multiaddr.Multiaddr
_, err := addr.ValueForProtocol(multiaddr.P_WS)
if err == nil {
storeInMultiaddrsKey = true
multiaddrItems = append(multiaddrItems, addr)
}
if port > 0 && port <= math.MaxUint16 { _, err = addr.ValueForProtocol(multiaddr.P_WSS)
r.Set(enr.TCP(uint16(port))) // lgtm [go/incorrect-integer-conversion] if err == nil {
} else { storeInMultiaddrsKey = true
return nil, nil, fmt.Errorf("could not set port %d", port) multiaddrItems = append(multiaddrItems, addr)
}
if !storeInMultiaddrsKey {
var ip string
dns4, err := addr.ValueForProtocol(multiaddr.P_DNS4)
if err != nil {
ip, err = addr.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
return nil, err
}
} else {
netIP, err := net.ResolveIPAddr("ip4", dns4)
if err != nil {
return nil, err
}
ip = netIP.String()
}
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
if port > 0 && port <= math.MaxUint16 {
r.Set(enr.TCP(uint16(port))) // lgtm [go/incorrect-integer-conversion]
} else {
return nil, fmt.Errorf("could not set port %d", port)
}
r.Set(enr.IP(net.ParseIP(ip)))
} else {
p2p, err := addr.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return nil, err
}
p2pAddr, err := multiaddr.NewMultiaddr("/p2p/" + p2p)
if err != nil {
return nil, fmt.Errorf("could not create p2p addr: %w", err)
}
var fieldRaw []byte
for _, ma := range multiaddrItems {
maRaw := ma.Decapsulate(p2pAddr).Bytes()
maSize := make([]byte, 2)
binary.BigEndian.PutUint16(maSize, uint16(len(maRaw)))
fieldRaw = append(fieldRaw, maSize...)
fieldRaw = append(fieldRaw, maRaw...)
}
if len(fieldRaw) != 0 {
r.Set(enr.WithEntry(MultiaddrENRField, fieldRaw))
}
}
} }
var multiaddrItems []multiaddr.Multiaddr
// 31/WAKU2-ENR
_, err = addr.ValueForProtocol(multiaddr.P_WS)
if err == nil {
multiaddrItems = append(multiaddrItems, addr)
}
_, err = addr.ValueForProtocol(multiaddr.P_WSS)
if err == nil {
multiaddrItems = append(multiaddrItems, addr)
}
p2p, err := addr.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return nil, nil, err
}
p2pAddr, err := multiaddr.NewMultiaddr("/p2p/" + p2p)
if err != nil {
return nil, nil, fmt.Errorf("could not create p2p addr: %w", err)
}
var fieldRaw []byte
for _, ma := range multiaddrItems {
maRaw := ma.Decapsulate(p2pAddr).Bytes()
maSize := make([]byte, 2)
binary.BigEndian.PutUint16(maSize, uint16(len(maRaw)))
fieldRaw = append(fieldRaw, maSize...)
fieldRaw = append(fieldRaw, maRaw...)
}
if len(fieldRaw) != 0 {
r.Set(enr.WithEntry(MultiaddrENRField, fieldRaw))
}
r.Set(enr.IP(net.ParseIP(ip)))
r.Set(enr.WithEntry(WakuENRField, wakuFlags)) r.Set(enr.WithEntry(WakuENRField, wakuFlags))
err := enode.SignV4(r, privK)
err = enode.SignV4(r, privK)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
node, err := enode.New(enode.ValidSchemes, r) node, err := enode.New(enode.ValidSchemes, r)
return node, tcpAddr, err return node, err
} }
// EnodeToMultiaddress converts an enode into a multiaddress // EnodeToMultiaddress converts an enode into a multiaddress
@ -160,21 +156,22 @@ func Multiaddress(node *enode.Node) ([]multiaddr.Multiaddr, error) {
return nil, err return nil, err
} }
var multiaddrRaw []byte var result []multiaddr.Multiaddr
if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil {
if enr.IsNotFound(err) { addr, err := enodeToMultiAddr(node)
Logger().Debug("trying to convert enode to multiaddress, since I could not retrieve multiaddress field for node ", zap.Any("enode", node)) if err != nil {
addr, err := enodeToMultiAddr(node)
if err != nil {
return nil, err
}
return []multiaddr.Multiaddr{addr}, nil
}
return nil, err return nil, err
} }
result = append(result, addr)
if len(multiaddrRaw) < 2 { var multiaddrRaw []byte
return nil, errors.New("invalid multiaddress field length") if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil {
if !enr.IsNotFound(err) {
return nil, err
} else {
// No multiaddr entry on enr
return result, nil
}
} }
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty())) hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty()))
@ -182,7 +179,6 @@ func Multiaddress(node *enode.Node) ([]multiaddr.Multiaddr, error) {
return nil, err return nil, err
} }
var result []multiaddr.Multiaddr
offset := 0 offset := 0
for { for {
maSize := binary.BigEndian.Uint16(multiaddrRaw[offset : offset+2]) maSize := binary.BigEndian.Uint16(multiaddrRaw[offset : offset+2])

2
vendor/modules.txt vendored
View File

@ -985,7 +985,7 @@ github.com/vacp2p/mvds/transport
github.com/waku-org/go-discover/discover github.com/waku-org/go-discover/discover
github.com/waku-org/go-discover/discover/v4wire github.com/waku-org/go-discover/discover/v4wire
github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-discover/discover/v5wire
# github.com/waku-org/go-waku v0.3.2-0.20230116133230-a4230afc62fb # github.com/waku-org/go-waku v0.3.2-0.20230117214048-e0ccdbe9665d
## explicit; go 1.18 ## explicit; go 1.18
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence github.com/waku-org/go-waku/waku/persistence