refactor: peerConnector (#665)

* refactor: peerConnector

* fix: code climate and dont waitOn subscriptions PeerData

* fix: check in peerConnector is on outRelay connections

* fix: introduced bug in peerConnector
This commit is contained in:
harsh jain 2023-08-28 10:47:48 +04:00 committed by GitHub
parent 09eb8ed19b
commit 467d1b2ca5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 102 deletions

View File

@ -253,7 +253,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log)
maxOutPeers := int(w.peermanager.OutRelayPeersTarget)
// Setup peer connection strategy
cacheSize := 600
@ -261,11 +260,10 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
minBackoff, maxBackoff := time.Minute, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, maxOutPeers, discoveryConnectTimeout, bkf, w.log)
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, w.peermanager, discoveryConnectTimeout, bkf, w.log)
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
w.peermanager.SetPeerConnector(w.peerConnector)
if w.opts.enableDiscV5 {
err := w.mountDiscV5()
@ -400,7 +398,6 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.peerConnector.SetHost(host)
w.peermanager.SetHost(host)
w.peerConnector.SetPeerManager(w.peermanager)
err = w.peerConnector.Start(ctx)
if err != nil {
return err

View File

@ -17,6 +17,8 @@ import (
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"sync/atomic"
"go.uber.org/zap"
lru "github.com/hashicorp/golang-lru"
@ -39,14 +41,10 @@ type PeerConnectionStrategy struct {
pm *PeerManager
cancel context.CancelFunc
paused bool
workerCtx context.Context
workerCancel context.CancelFunc
paused atomic.Bool
wg sync.WaitGroup
maxOutPeers int
dialTimeout time.Duration
peerCh chan PeerData
dialCh chan peer.AddrInfo
subscriptions []<-chan PeerData
@ -62,7 +60,7 @@ type PeerConnectionStrategy struct {
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int,
func NewPeerConnectionStrategy(cacheSize int, pm *PeerManager,
dialTimeout time.Duration, backoff backoff.BackoffFactory,
logger *zap.Logger) (*PeerConnectionStrategy, error) {
@ -70,15 +68,16 @@ func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int,
if err != nil {
return nil, err
}
return &PeerConnectionStrategy{
pc := &PeerConnectionStrategy{
cache: cache,
wg: sync.WaitGroup{},
maxOutPeers: maxOutPeers,
dialTimeout: dialTimeout,
pm: pm,
backoff: backoff,
logger: logger.Named("discovery-connector"),
}, nil
}
pm.SetPeerConnector(pc)
return pc, nil
}
type connCacheData struct {
@ -101,18 +100,31 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa
func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-chan PeerData) {
for {
// for returning from the loop when peerConnector is paused.
select {
case <-ctx.Done():
return
case p := <-ch:
default:
}
//
if !c.isPaused() {
select {
case <-ctx.Done():
return
case c.peerCh <- p:
case p, ok := <-ch:
if !ok {
return
}
c.pm.AddDiscoveredPeer(p)
c.publishWork(ctx, p.AddrInfo)
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break
}
} else {
time.Sleep(1 * time.Second) // sleep while the peerConnector is paused.
}
}
}
// SetHost sets the host to be able to mount or consume a protocol
@ -120,11 +132,6 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) {
c.host = h
}
// SetPeerManager sets the peermanager in order to utilize add peer
func (c *PeerConnectionStrategy) SetPeerManager(pm *PeerManager) {
c.pm = pm
}
// Start attempts to connect to the peers passed in by peerCh.
// Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
@ -134,12 +141,10 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
c.peerCh = make(chan PeerData)
c.dialCh = make(chan peer.AddrInfo)
c.wg.Add(3)
c.wg.Add(2)
go c.shouldDialPeers(ctx)
go c.workPublisher(ctx)
go c.dialPeers(ctx)
c.consumeSubscriptions(ctx)
@ -154,19 +159,14 @@ func (c *PeerConnectionStrategy) Stop() {
}
c.cancel()
c.cancel = nil
c.wg.Wait()
close(c.peerCh)
close(c.dialCh)
c.subscriptions = nil
c.cancel = nil
}
func (c *PeerConnectionStrategy) isPaused() bool {
c.RLock()
defer c.RUnlock()
return c.paused
return c.paused.Load()
}
func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
@ -174,38 +174,18 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
c.Lock()
c.workerCtx, c.workerCancel = context.WithCancel(ctx)
c.Unlock()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
isPaused := c.isPaused()
_, outRelayPeers, err := c.pm.GroupPeersByDirection()
if err != nil {
c.logger.Warn("failed to get outRelayPeers from peerstore", zap.Error(err))
continue
}
numPeers := outRelayPeers.Len()
if numPeers >= c.maxOutPeers && !isPaused {
c.Lock()
c.paused = true
c.workerCancel()
c.Unlock()
} else if numPeers < c.maxOutPeers && isPaused {
c.Lock()
c.paused = false
c.workerCtx, c.workerCancel = context.WithCancel(ctx)
c.Unlock()
}
_, outRelayPeers := c.pm.getRelayPeers()
c.paused.Store(outRelayPeers.Len() >= c.pm.OutRelayPeersTarget) // pause if no of OutPeers more than or eq to target
}
}
}
// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set.
func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) {
for _, subs := range c.subscriptions {
c.wg.Add(1)
@ -214,6 +194,7 @@ func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) {
c.consumeSubscription(ctx, s)
}(subs)
}
c.subscriptions = nil
}
func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) {
@ -224,45 +205,19 @@ func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInf
}
}
func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {
defer c.wg.Done()
for {
select {
case <-ctx.Done():
return
default:
isPaused := c.isPaused()
if !isPaused {
select {
case <-ctx.Done():
return
case p := <-c.peerCh:
c.pm.AddDiscoveredPeer(p)
c.publishWork(ctx, p.AddrInfo)
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break
}
} else {
// Check if paused again
time.Sleep(1 * time.Second)
}
}
}
}
const maxActiveDials = 5
// c.cache is thread safe
// only reason why mutex is used: if canDialPeer is queried twice for the same peer.
func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool {
c.mux.Lock()
defer c.mux.Unlock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
return false
}
@ -272,14 +227,13 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool {
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
}
c.mux.Unlock()
return true
}
func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer c.wg.Done()
maxGoRoutines := c.maxOutPeers
maxGoRoutines := c.pm.OutRelayPeersTarget
if maxGoRoutines > maxActiveDials {
maxGoRoutines = maxActiveDials
}
@ -301,9 +255,7 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
if c.canDialPeer(pi) {
sem <- struct{}{}
c.wg.Add(1)
go c.dialPeer(pi, sem)
} else {
continue
go c.dialPeer(ctx, pi, sem)
}
case <-ctx.Done():
return
@ -311,11 +263,9 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
}
}
func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
func (c *PeerConnectionStrategy) dialPeer(ctx context.Context, pi peer.AddrInfo, sem chan struct{}) {
defer c.wg.Done()
c.RLock()
ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
c.RUnlock()
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {

View File

@ -147,7 +147,7 @@ func (pm *PeerManager) connectToRelayPeers() {
inRelayPeers, outRelayPeers := pm.getRelayPeers()
if inRelayPeers.Len() > 0 &&
inRelayPeers.Len() > pm.InRelayPeersTarget {
pm.pruneInRelayConns(inRelayPeers, outRelayPeers)
pm.pruneInRelayConns(inRelayPeers)
}
if outRelayPeers.Len() > pm.OutRelayPeersTarget {
@ -191,7 +191,7 @@ func (pm *PeerManager) getNotConnectedPers() (notConnectedPeers peer.IDSlice) {
return
}
func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {
//Start disconnecting peers, based on what?
//For now, just disconnect most recently connected peers
@ -256,7 +256,7 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocol
//Add Service peers to serviceSlots.
for _, proto := range protocols {
pm.AddPeerToServiceSlot(proto, info.ID, origin)
pm.AddPeerToServiceSlot(proto, info.ID)
}
//Add to the peer-store
@ -286,7 +286,7 @@ func (pm *PeerManager) RemovePeer(peerID peer.ID) {
// AddPeerToServiceSlot adds a peerID to serviceSlot.
// Adding to peerStore is expected to be already done by caller.
// If relay proto is passed, it is not added to serviceSlot.
func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID, origin wps.Origin) {
func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
if proto == WakuRelayIDv200 {
pm.logger.Warn("Cannot add Relay peer to service peer slots")
return

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/peermanager"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -45,7 +44,7 @@ func TestServiceSlots(t *testing.T) {
require.Equal(t, peerId, h2.ID())
//Test addition and selection from service-slot
pm.AddPeerToServiceSlot(protocol, h2.ID(), wps.Static)
pm.AddPeerToServiceSlot(protocol, h2.ID())
peerId, err = pm.SelectPeer(protocol, nil, utils.Logger())
require.NoError(t, err)
@ -58,14 +57,14 @@ func TestServiceSlots(t *testing.T) {
require.Equal(t, peerId, h2.ID())
h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
pm.AddPeerToServiceSlot(protocol, h3.ID(), wps.Static)
pm.AddPeerToServiceSlot(protocol, h3.ID())
h4, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h4.Close()
h1.Peerstore().AddAddrs(h4.ID(), h4.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
pm.AddPeerToServiceSlot(protocol1, h4.ID(), wps.Static)
pm.AddPeerToServiceSlot(protocol1, h4.ID())
//Test peer selection from first added peer to serviceSlot
peerId, err = pm.SelectPeer(protocol, nil, utils.Logger())
@ -91,7 +90,7 @@ func TestServiceSlots(t *testing.T) {
require.Error(t, err, utils.ErrNoPeersAvailable)
//Test peer selection for relay protocol from peer store
h1.Peerstore().AddAddrs(h5.ID(), h5.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
pm.AddPeerToServiceSlot(peermanager.WakuRelayIDv200, h5.ID(), wps.Static)
pm.AddPeerToServiceSlot(peermanager.WakuRelayIDv200, h5.ID())
_, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger())
require.Error(t, err, utils.ErrNoPeersAvailable)