fix: parallelize filter subs to different peers (#1169)

This commit is contained in:
Prem Chaitanya Prathi 2024-07-30 18:06:41 +05:30 committed by GitHub
parent 76d8fd687d
commit e1e136cc68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 20 deletions

View File

@ -407,21 +407,35 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
paramsCopy := params.Copy() paramsCopy := params.Copy()
paramsCopy.selectedPeers = selectedPeers paramsCopy.selectedPeers = selectedPeers
for _, peer := range selectedPeers { var wg sync.WaitGroup
err := wf.request( reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
ctx, defer cancel()
params.requestID, tmpSubs := make([]*subscription.SubscriptionDetails, len(selectedPeers))
pb.FilterSubscribeRequest_SUBSCRIBE, for i, peerID := range selectedPeers {
cFilter, wg.Add(1)
peer) go func(index int, ID peer.ID) {
if err != nil { defer wg.Done()
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), err := wf.request(
zap.Error(err)) reqCtx,
failedContentTopics = append(failedContentTopics, cTopics...) params.requestID,
continue pb.FilterSubscribeRequest_SUBSCRIBE,
cFilter,
ID)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
} else {
wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", ID))
tmpSubs[index] = wf.subscriptions.NewSubscription(ID, cFilter)
}
}(i, peerID)
}
wg.Wait()
for _, sub := range tmpSubs {
if sub != nil {
subscriptions = append(subscriptions, sub)
} }
wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer))
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter))
} }
} }

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"math" "math"
"sync" "sync"
"time"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
@ -328,19 +329,21 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
logger.Debug("publishing message", zap.Stringers("peers", params.selectedPeers)) logger.Debug("publishing message", zap.Stringers("peers", params.selectedPeers))
var wg sync.WaitGroup var wg sync.WaitGroup
var responses []*pb.PushResponse responses := make([]*pb.PushResponse, params.selectedPeers.Len())
for _, peerID := range params.selectedPeers { reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
for i, peerID := range params.selectedPeers {
wg.Add(1) wg.Add(1)
go func(id peer.ID) { go func(index int, id peer.ID) {
paramsValue := *params paramsValue := *params
paramsValue.requestID = protocol.GenerateRequestID() paramsValue.requestID = protocol.GenerateRequestID()
defer wg.Done() defer wg.Done()
response, err := wakuLP.request(ctx, req, &paramsValue, id) response, err := wakuLP.request(reqCtx, req, &paramsValue, id)
if err != nil { if err != nil {
logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id)) logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id))
} }
responses = append(responses, response) responses[index] = response
}(peerID) }(i, peerID)
} }
wg.Wait() wg.Wait()
var successCount int var successCount int