From 467d1b2ca54eeab8328908591f0864031df58961 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Mon, 28 Aug 2023 10:47:48 +0400 Subject: [PATCH] refactor: peerConnector (#665) * refactor: peerConnector * fix: code climate and dont waitOn subscriptions PeerData * fix: check in peerConnector is on outRelay connections * fix: introduced bug in peerConnector --- waku/v2/node/wakunode2.go | 5 +- waku/v2/peermanager/peer_connector.go | 128 ++++++------------ waku/v2/peermanager/peer_manager.go | 8 +- waku/v2/peermanager/test/peer_manager_test.go | 9 +- 4 files changed, 48 insertions(+), 102 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5297e28c..b21c3193 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -253,7 +253,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { //Initialize peer manager. w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log) - maxOutPeers := int(w.peermanager.OutRelayPeersTarget) // Setup peer connection strategy cacheSize := 600 @@ -261,11 +260,10 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { minBackoff, maxBackoff := time.Minute, time.Hour bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) - w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, maxOutPeers, discoveryConnectTimeout, bkf, w.log) + w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, w.peermanager, discoveryConnectTimeout, bkf, w.log) if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } - w.peermanager.SetPeerConnector(w.peerConnector) if w.opts.enableDiscV5 { err := w.mountDiscV5() @@ -400,7 +398,6 @@ func (w *WakuNode) Start(ctx context.Context) error { w.peerConnector.SetHost(host) w.peermanager.SetHost(host) - w.peerConnector.SetPeerManager(w.peermanager) err = w.peerConnector.Start(ctx) if err != nil { return err diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 460f6cd4..fa1e2a87 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -17,6 +17,8 @@ import ( "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "sync/atomic" + "go.uber.org/zap" lru "github.com/hashicorp/golang-lru" @@ -39,14 +41,10 @@ type PeerConnectionStrategy struct { pm *PeerManager cancel context.CancelFunc - paused bool - workerCtx context.Context - workerCancel context.CancelFunc + paused atomic.Bool wg sync.WaitGroup - maxOutPeers int dialTimeout time.Duration - peerCh chan PeerData dialCh chan peer.AddrInfo subscriptions []<-chan PeerData @@ -62,7 +60,7 @@ type PeerConnectionStrategy struct { // dialTimeout is how long we attempt to connect to a peer before giving up // minPeers is the minimum number of peers that the node should have // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer -func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, +func NewPeerConnectionStrategy(cacheSize int, pm *PeerManager, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) { @@ -70,15 +68,16 @@ func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, if err != nil { return nil, err } - - return &PeerConnectionStrategy{ + pc := &PeerConnectionStrategy{ cache: cache, wg: sync.WaitGroup{}, - maxOutPeers: maxOutPeers, dialTimeout: dialTimeout, + pm: pm, backoff: backoff, logger: logger.Named("discovery-connector"), - }, nil + } + pm.SetPeerConnector(pc) + return pc, nil } type connCacheData struct { @@ -101,18 +100,31 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-chan PeerData) { for { + // for returning from the loop when peerConnector is paused. select { case <-ctx.Done(): return - case p := <-ch: + default: + } + // + if !c.isPaused() { select { case <-ctx.Done(): return - case c.peerCh <- p: + case p, ok := <-ch: + if !ok { + return + } + c.pm.AddDiscoveredPeer(p) + c.publishWork(ctx, p.AddrInfo) + case <-time.After(1 * time.Second): + // This timeout is to not lock the goroutine + break } + } else { + time.Sleep(1 * time.Second) // sleep while the peerConnector is paused. } } - } // SetHost sets the host to be able to mount or consume a protocol @@ -120,11 +132,6 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) { c.host = h } -// SetPeerManager sets the peermanager in order to utilize add peer -func (c *PeerConnectionStrategy) SetPeerManager(pm *PeerManager) { - c.pm = pm -} - // Start attempts to connect to the peers passed in by peerCh. // Will not connect to peers if they are within the backoff period. func (c *PeerConnectionStrategy) Start(ctx context.Context) error { @@ -134,12 +141,10 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) c.cancel = cancel - c.peerCh = make(chan PeerData) c.dialCh = make(chan peer.AddrInfo) - c.wg.Add(3) + c.wg.Add(2) go c.shouldDialPeers(ctx) - go c.workPublisher(ctx) go c.dialPeers(ctx) c.consumeSubscriptions(ctx) @@ -154,19 +159,14 @@ func (c *PeerConnectionStrategy) Stop() { } c.cancel() + c.cancel = nil c.wg.Wait() - close(c.peerCh) close(c.dialCh) - - c.subscriptions = nil - c.cancel = nil } func (c *PeerConnectionStrategy) isPaused() bool { - c.RLock() - defer c.RUnlock() - return c.paused + return c.paused.Load() } func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { @@ -174,38 +174,18 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() - - c.Lock() - c.workerCtx, c.workerCancel = context.WithCancel(ctx) - c.Unlock() - for { select { case <-ctx.Done(): return case <-ticker.C: - isPaused := c.isPaused() - _, outRelayPeers, err := c.pm.GroupPeersByDirection() - if err != nil { - c.logger.Warn("failed to get outRelayPeers from peerstore", zap.Error(err)) - continue - } - numPeers := outRelayPeers.Len() - if numPeers >= c.maxOutPeers && !isPaused { - c.Lock() - c.paused = true - c.workerCancel() - c.Unlock() - } else if numPeers < c.maxOutPeers && isPaused { - c.Lock() - c.paused = false - c.workerCtx, c.workerCancel = context.WithCancel(ctx) - c.Unlock() - } + _, outRelayPeers := c.pm.getRelayPeers() + c.paused.Store(outRelayPeers.Len() >= c.pm.OutRelayPeersTarget) // pause if no of OutPeers more than or eq to target } } } +// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set. func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) { for _, subs := range c.subscriptions { c.wg.Add(1) @@ -214,6 +194,7 @@ func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) { c.consumeSubscription(ctx, s) }(subs) } + c.subscriptions = nil } func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) { @@ -224,45 +205,19 @@ func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInf } } -func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) { - defer c.wg.Done() - - for { - select { - case <-ctx.Done(): - return - default: - isPaused := c.isPaused() - if !isPaused { - select { - case <-ctx.Done(): - return - case p := <-c.peerCh: - c.pm.AddDiscoveredPeer(p) - c.publishWork(ctx, p.AddrInfo) - case <-time.After(1 * time.Second): - // This timeout is to not lock the goroutine - break - } - } else { - // Check if paused again - time.Sleep(1 * time.Second) - } - } - } -} - const maxActiveDials = 5 +// c.cache is thread safe +// only reason why mutex is used: if canDialPeer is queried twice for the same peer. func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool { c.mux.Lock() + defer c.mux.Unlock() val, ok := c.cache.Get(pi.ID) var cachedPeer *connCacheData if ok { tv := val.(*connCacheData) now := time.Now() if now.Before(tv.nextTry) { - c.mux.Unlock() return false } @@ -272,14 +227,13 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool { cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay()) c.cache.Add(pi.ID, cachedPeer) } - c.mux.Unlock() return true } func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { defer c.wg.Done() - maxGoRoutines := c.maxOutPeers + maxGoRoutines := c.pm.OutRelayPeersTarget if maxGoRoutines > maxActiveDials { maxGoRoutines = maxActiveDials } @@ -301,9 +255,7 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { if c.canDialPeer(pi) { sem <- struct{}{} c.wg.Add(1) - go c.dialPeer(pi, sem) - } else { - continue + go c.dialPeer(ctx, pi, sem) } case <-ctx.Done(): return @@ -311,11 +263,9 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { } } -func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { +func (c *PeerConnectionStrategy) dialPeer(ctx context.Context, pi peer.AddrInfo, sem chan struct{}) { defer c.wg.Done() - c.RLock() - ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout) - c.RUnlock() + ctx, cancel := context.WithTimeout(ctx, c.dialTimeout) defer cancel() err := c.host.Connect(ctx, pi) if err != nil && !errors.Is(err, context.Canceled) { diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 71cc2586..e0607bd0 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -147,7 +147,7 @@ func (pm *PeerManager) connectToRelayPeers() { inRelayPeers, outRelayPeers := pm.getRelayPeers() if inRelayPeers.Len() > 0 && inRelayPeers.Len() > pm.InRelayPeersTarget { - pm.pruneInRelayConns(inRelayPeers, outRelayPeers) + pm.pruneInRelayConns(inRelayPeers) } if outRelayPeers.Len() > pm.OutRelayPeersTarget { @@ -191,7 +191,7 @@ func (pm *PeerManager) getNotConnectedPers() (notConnectedPeers peer.IDSlice) { return } -func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { +func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { //Start disconnecting peers, based on what? //For now, just disconnect most recently connected peers @@ -256,7 +256,7 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocol //Add Service peers to serviceSlots. for _, proto := range protocols { - pm.AddPeerToServiceSlot(proto, info.ID, origin) + pm.AddPeerToServiceSlot(proto, info.ID) } //Add to the peer-store @@ -286,7 +286,7 @@ func (pm *PeerManager) RemovePeer(peerID peer.ID) { // AddPeerToServiceSlot adds a peerID to serviceSlot. // Adding to peerStore is expected to be already done by caller. // If relay proto is passed, it is not added to serviceSlot. -func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID, origin wps.Origin) { +func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { if proto == WakuRelayIDv200 { pm.logger.Warn("Cannot add Relay peer to service peer slots") return diff --git a/waku/v2/peermanager/test/peer_manager_test.go b/waku/v2/peermanager/test/peer_manager_test.go index bd8c71b4..cd40e476 100644 --- a/waku/v2/peermanager/test/peer_manager_test.go +++ b/waku/v2/peermanager/test/peer_manager_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/peermanager" - wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -45,7 +44,7 @@ func TestServiceSlots(t *testing.T) { require.Equal(t, peerId, h2.ID()) //Test addition and selection from service-slot - pm.AddPeerToServiceSlot(protocol, h2.ID(), wps.Static) + pm.AddPeerToServiceSlot(protocol, h2.ID()) peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) require.NoError(t, err) @@ -58,14 +57,14 @@ func TestServiceSlots(t *testing.T) { require.Equal(t, peerId, h2.ID()) h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - pm.AddPeerToServiceSlot(protocol, h3.ID(), wps.Static) + pm.AddPeerToServiceSlot(protocol, h3.ID()) h4, err := tests.MakeHost(ctx, 0, rand.Reader) require.NoError(t, err) defer h4.Close() h1.Peerstore().AddAddrs(h4.ID(), h4.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - pm.AddPeerToServiceSlot(protocol1, h4.ID(), wps.Static) + pm.AddPeerToServiceSlot(protocol1, h4.ID()) //Test peer selection from first added peer to serviceSlot peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) @@ -91,7 +90,7 @@ func TestServiceSlots(t *testing.T) { require.Error(t, err, utils.ErrNoPeersAvailable) //Test peer selection for relay protocol from peer store h1.Peerstore().AddAddrs(h5.ID(), h5.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - pm.AddPeerToServiceSlot(peermanager.WakuRelayIDv200, h5.ID(), wps.Static) + pm.AddPeerToServiceSlot(peermanager.WakuRelayIDv200, h5.ID()) _, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger()) require.Error(t, err, utils.ErrNoPeersAvailable)