From e1e136cc68a9f6aa3202b6206d4a3c99bd35831a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 30 Jul 2024 18:06:41 +0530 Subject: [PATCH] fix: parallelize filter subs to different peers (#1169) --- waku/v2/protocol/filter/client.go | 42 +++++++++++++------- waku/v2/protocol/lightpush/waku_lightpush.go | 15 ++++--- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 52b4efa6..c52c9098 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -407,21 +407,35 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot paramsCopy := params.Copy() paramsCopy.selectedPeers = selectedPeers - for _, peer := range selectedPeers { - err := wf.request( - ctx, - params.requestID, - pb.FilterSubscribeRequest_SUBSCRIBE, - cFilter, - peer) - if err != nil { - wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), - zap.Error(err)) - failedContentTopics = append(failedContentTopics, cTopics...) - continue + var wg sync.WaitGroup + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + tmpSubs := make([]*subscription.SubscriptionDetails, len(selectedPeers)) + for i, peerID := range selectedPeers { + wg.Add(1) + go func(index int, ID peer.ID) { + defer wg.Done() + err := wf.request( + reqCtx, + params.requestID, + pb.FilterSubscribeRequest_SUBSCRIBE, + cFilter, + ID) + if err != nil { + wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + } else { + wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", ID)) + tmpSubs[index] = wf.subscriptions.NewSubscription(ID, cFilter) + } + }(i, peerID) + } + wg.Wait() + for _, sub := range tmpSubs { + if sub != nil { + subscriptions = append(subscriptions, sub) } - wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer)) - subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter)) } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 10028fdd..c0a72c2e 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "sync" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -328,19 +329,21 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa logger.Debug("publishing message", zap.Stringers("peers", params.selectedPeers)) var wg sync.WaitGroup - var responses []*pb.PushResponse - for _, peerID := range params.selectedPeers { + responses := make([]*pb.PushResponse, params.selectedPeers.Len()) + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + for i, peerID := range params.selectedPeers { wg.Add(1) - go func(id peer.ID) { + go func(index int, id peer.ID) { paramsValue := *params paramsValue.requestID = protocol.GenerateRequestID() defer wg.Done() - response, err := wakuLP.request(ctx, req, ¶msValue, id) + response, err := wakuLP.request(reqCtx, req, ¶msValue, id) if err != nil { logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id)) } - responses = append(responses, response) - }(peerID) + responses[index] = response + }(i, peerID) } wg.Wait() var successCount int