From a650469fae891c7a2e9afad54c510e0c62920cc6 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Tue, 19 Sep 2023 07:39:39 +0700 Subject: [PATCH] feat: use CommonService in peerConnector (#737) * feat(CommonService): add channel and use commonService in discv5 * fix: add mutex to PushToChan * fix: remove generic functionality * feat: use CommonService in peerConnector * fix: remove generic functionality * nit: add error log --- waku/v2/peermanager/peer_connector.go | 135 +++++++++++--------------- waku/v2/peermanager/peer_manager.go | 16 ++- waku/v2/rendezvous/rendezvous.go | 1 + 3 files changed, 69 insertions(+), 83 deletions(-) diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index cf7414b9..33c4c8ae 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -7,6 +7,7 @@ import ( "errors" "math/rand" "sync" + "sync/atomic" "time" "github.com/libp2p/go-libp2p/core/host" @@ -17,8 +18,6 @@ import ( "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" - "sync/atomic" - "go.uber.org/zap" lru "github.com/hashicorp/golang-lru" @@ -27,22 +26,17 @@ import ( // PeerConnectionStrategy is a utility to connect to peers, // but only if we have not recently tried connecting to them already type PeerConnectionStrategy struct { - sync.RWMutex + mux sync.Mutex + cache *lru.TwoQueueCache + host host.Host + pm *PeerManager - cache *lru.TwoQueueCache - host host.Host - pm *PeerManager - cancel context.CancelFunc - - paused atomic.Bool - - wg sync.WaitGroup - dialTimeout time.Duration - dialCh chan peer.AddrInfo + paused atomic.Bool + dialTimeout time.Duration + *CommonDiscoveryService subscriptions []<-chan PeerData backoff backoff.BackoffFactory - mux sync.Mutex logger *zap.Logger } @@ -69,12 +63,12 @@ func NewPeerConnectionStrategy(pm *PeerManager, } // pc := &PeerConnectionStrategy{ - cache: cache, - wg: sync.WaitGroup{}, - dialTimeout: dialTimeout, - pm: pm, - backoff: getBackOff(), - logger: logger.Named("discovery-connector"), + cache: cache, + dialTimeout: dialTimeout, + CommonDiscoveryService: NewCommonDiscoveryService(), + pm: pm, + backoff: getBackOff(), + logger: logger.Named("discovery-connector"), } pm.SetPeerConnector(pc) return pc, nil @@ -87,36 +81,40 @@ type connCacheData struct { // Subscribe receives channels on which discovered peers should be pushed func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerData) { - if c.cancel != nil { - c.wg.Add(1) - go func() { - defer c.wg.Done() - c.consumeSubscription(ctx, ch) - }() - } else { + // if not running yet, store the subscription and return + if err := c.ErrOnNotRunning(); err != nil { + c.mux.Lock() c.subscriptions = append(c.subscriptions, ch) + c.mux.Unlock() + return } + // if running start a goroutine to consume the subscription + c.WaitGroup().Add(1) + go func() { + defer c.WaitGroup().Done() + c.consumeSubscription(ch) + }() } -func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-chan PeerData) { +func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) { for { // for returning from the loop when peerConnector is paused. select { - case <-ctx.Done(): + case <-c.Context().Done(): return default: } // if !c.isPaused() { select { - case <-ctx.Done(): + case <-c.Context().Done(): return case p, ok := <-ch: if !ok { return } c.pm.AddDiscoveredPeer(p) - c.publishWork(ctx, p.AddrInfo) + c.PushToChan(p) case <-time.After(1 * time.Second): // This timeout is to not lock the goroutine break @@ -135,48 +133,36 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) { // Start attempts to connect to the peers passed in by peerCh. // Will not connect to peers if they are within the backoff period. func (c *PeerConnectionStrategy) Start(ctx context.Context) error { - if c.cancel != nil { - return errors.New("already started") - } + return c.CommonDiscoveryService.Start(ctx, c.start) - ctx, cancel := context.WithCancel(ctx) - c.cancel = cancel - c.dialCh = make(chan peer.AddrInfo) +} +func (c *PeerConnectionStrategy) start() error { + c.WaitGroup().Add(2) + go c.shouldDialPeers() + go c.dialPeers() - c.wg.Add(2) - go c.shouldDialPeers(ctx) - go c.dialPeers(ctx) - - c.consumeSubscriptions(ctx) + c.consumeSubscriptions() return nil } // Stop terminates the peer-connector func (c *PeerConnectionStrategy) Stop() { - if c.cancel == nil { - return - } - - c.cancel() - c.cancel = nil - c.wg.Wait() - - close(c.dialCh) + c.CommonDiscoveryService.Stop(func() {}) } func (c *PeerConnectionStrategy) isPaused() bool { return c.paused.Load() } -func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { - defer c.wg.Done() +func (c *PeerConnectionStrategy) shouldDialPeers() { + defer c.WaitGroup().Done() ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { - case <-ctx.Done(): + case <-c.Context().Done(): return case <-ticker.C: _, outRelayPeers := c.pm.getRelayPeers() @@ -186,25 +172,17 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { } // it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set. -func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) { +func (c *PeerConnectionStrategy) consumeSubscriptions() { for _, subs := range c.subscriptions { - c.wg.Add(1) + c.WaitGroup().Add(1) go func(s <-chan PeerData) { - defer c.wg.Done() - c.consumeSubscription(ctx, s) + defer c.WaitGroup().Done() + c.consumeSubscription(s) }(subs) } c.subscriptions = nil } -func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) { - select { - case c.dialCh <- p: - case <-ctx.Done(): - return - } -} - const maxActiveDials = 5 // c.cache is thread safe @@ -230,8 +208,8 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool { return true } -func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { - defer c.wg.Done() +func (c *PeerConnectionStrategy) dialPeers() { + defer c.WaitGroup().Done() maxGoRoutines := c.pm.OutRelayPeersTarget if maxGoRoutines > maxActiveDials { @@ -242,30 +220,31 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { for { select { - case pi, ok := <-c.dialCh: + case pd, ok := <-c.GetListeningChan(): if !ok { return } + addrInfo := pd.AddrInfo - if pi.ID == c.host.ID() || pi.ID == "" || - c.host.Network().Connectedness(pi.ID) == network.Connected { + if addrInfo.ID == c.host.ID() || addrInfo.ID == "" || + c.host.Network().Connectedness(addrInfo.ID) == network.Connected { continue } - if c.canDialPeer(pi) { + if c.canDialPeer(addrInfo) { sem <- struct{}{} - c.wg.Add(1) - go c.dialPeer(ctx, pi, sem) + c.WaitGroup().Add(1) + go c.dialPeer(addrInfo, sem) } - case <-ctx.Done(): + case <-c.Context().Done(): return } } } -func (c *PeerConnectionStrategy) dialPeer(ctx context.Context, pi peer.AddrInfo, sem chan struct{}) { - defer c.wg.Done() - ctx, cancel := context.WithTimeout(ctx, c.dialTimeout) +func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { + defer c.WaitGroup().Done() + ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout) defer cancel() err := c.host.Connect(ctx, pi) if err != nil && !errors.Is(err, context.Canceled) { diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 87ddeae9..d5b83f85 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -172,13 +172,19 @@ func (pm *PeerManager) connectToRelayPeers() { } //Else: Should we raise some sort of unhealthy event?? } +func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) PeerData { + return PeerData{ + Origin: origin, + AddrInfo: peer.AddrInfo{ + ID: peerID, + Addrs: host.Peerstore().Addrs(peerID), + }, + } +} func (pm *PeerManager) connectToPeers(peers peer.IDSlice) { for _, peerID := range peers { - peerInfo := peer.AddrInfo{ - ID: peerID, - Addrs: pm.host.Peerstore().Addrs(peerID), - } - pm.peerConnector.publishWork(pm.ctx, peerInfo) + peerData := addrInfoToPeerData(wps.PeerManager, peerID, pm.host) + pm.peerConnector.PushToChan(peerData) } } diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index e28d7b15..a5ea444c 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -110,6 +110,7 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string PubSubTopics: []string{namespace}, } if !r.PushToChan(peer) { + r.log.Error("could push to closed channel/context completed") return } }