From 88d52d64171a43d3c72c70673f2ba686714dfd6a Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 6 Jul 2023 10:19:51 -0400 Subject: [PATCH] fix: set concurrent dials to 5 --- waku/v2/discovery_connector.go | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/waku/v2/discovery_connector.go b/waku/v2/discovery_connector.go index 18724621..03914e34 100644 --- a/waku/v2/discovery_connector.go +++ b/waku/v2/discovery_connector.go @@ -245,8 +245,18 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) { } } +const maxActiveDials = 5 + func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { defer c.wg.Done() + + maxGoRoutines := c.minPeers + if maxGoRoutines > maxActiveDials { + maxGoRoutines = maxActiveDials + } + + sem := make(chan struct{}, maxGoRoutines) + for { select { case pi, ok := <-c.dialCh: @@ -281,13 +291,19 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { } c.mux.Unlock() - dialCtx, dialCtxCancel := context.WithTimeout(c.workerCtx, c.dialTimeout) - err := c.host.Connect(dialCtx, pi) - if err != nil && !errors.Is(err, context.Canceled) { - c.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(pi) - c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) - } - dialCtxCancel() + sem <- struct{}{} + c.wg.Add(1) + go func(pi peer.AddrInfo) { + defer c.wg.Done() + ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout) + defer cancel() + err := c.host.Connect(ctx, pi) + if err != nil && !errors.Is(err, context.Canceled) { + c.host.Peerstore().(peers.WakuPeerstore).AddConnFailure(pi) + c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) + } + <-sem + }(pi) case <-ctx.Done(): return