diff --git a/waku/v2/protocol/filterv2/client.go b/waku/v2/protocol/filterv2/client.go index 40167089..d737fcaf 100644 --- a/waku/v2/protocol/filterv2/client.go +++ b/waku/v2/protocol/filterv2/client.go @@ -4,7 +4,9 @@ import ( "context" "encoding/hex" "errors" + "fmt" "math" + "net/http" "sync" "github.com/libp2p/go-libp2p/core/host" @@ -46,6 +48,11 @@ type ContentFilter struct { ContentTopics []string } +type WakuFilterPushResult struct { + err error + peerID peer.ID +} + // NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options func NewWakuFilterPush(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterPush { wf := new(WakuFilterPush) @@ -141,8 +148,10 @@ func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribePa if err != nil { return err } + defer conn.Close() writer := protoio.NewDelimitedWriter(conn) + reader := protoio.NewDelimitedReader(conn, math.MaxInt32) request := &pb.FilterSubscribeRequest{ RequestId: hex.EncodeToString(params.requestId), @@ -158,13 +167,29 @@ func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribePa return err } - defer conn.Close() + filterSubscribeResponse := &pb.FilterSubscribeResponse{} + err = reader.ReadMsg(filterSubscribeResponse) + if err != nil { + wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err)) + return err + } + + if filterSubscribeResponse.StatusCode != http.StatusOK { + return fmt.Errorf("filter err: %d, %s", filterSubscribeResponse.StatusCode, filterSubscribeResponse.StatusDesc) + } + return nil } // Subscribe setups a subscription to receive messages that match a specific content filter func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) error { - // TODO: validate content filters + if contentFilter.Topic == "" { + return errors.New("topic is required") + } + + if len(contentFilter.ContentTopics) == 0 { + return errors.New("at least one content topic is required") + } params := new(FilterSubscribeParameters) params.log = wf.log @@ -208,21 +233,31 @@ func (wf *WakuFilterPush) getUnsubscribeParameters(opts ...FilterUnsubscribeOpti } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) error { - // TODO: checks if a subscription exists with the chosen criteria +func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (error, <-chan WakuFilterPushResult) { + if contentFilter.Topic == "" { + return errors.New("topic is required"), nil + } + + if len(contentFilter.ContentTopics) == 0 { + return errors.New("at least one content topic is required"), nil + } params, err := wf.getUnsubscribeParameters(opts...) if err != nil { - return err + return err, nil } + localWg := sync.WaitGroup{} + resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) + for peerID := range wf.subscriptions.items { if !params.unsubscribeAll && peerID != params.selectedPeer { continue } + localWg.Add(1) go func(peerID peer.ID) { - defer wf.wg.Done() + defer localWg.Done() err := wf.request( ctx, &FilterSubscribeParameters{selectedPeer: peerID}, @@ -231,28 +266,38 @@ func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter Content if err != nil { wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } + + resultChan <- WakuFilterPushResult{ + err: err, + peerID: peerID, + } }(peerID) } - return nil + localWg.Wait() + close(resultChan) + + return nil, resultChan } // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions -func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) error { +func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { params, err := wf.getUnsubscribeParameters(opts...) if err != nil { - return err + return nil, err } wf.subscriptions.Lock() defer wf.subscriptions.Unlock() - wf.wg.Add(len(wf.subscriptions.items)) + localWg := sync.WaitGroup{} + resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) + for peerID := range wf.subscriptions.items { if !params.unsubscribeAll && peerID != params.selectedPeer { continue } - + localWg.Add(1) go func(peerID peer.ID) { defer wf.wg.Done() err := wf.request( @@ -263,8 +308,16 @@ func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsu if err != nil { wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } + + resultChan <- WakuFilterPushResult{ + err: err, + peerID: peerID, + } }(peerID) } - return nil + localWg.Wait() + close(resultChan) + + return resultChan, nil } diff --git a/waku/v2/protocol/filterv2/server.go b/waku/v2/protocol/filterv2/server.go index 213e9198..eb53b86c 100644 --- a/waku/v2/protocol/filterv2/server.go +++ b/waku/v2/protocol/filterv2/server.go @@ -21,7 +21,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" "go.opencensus.io/tag" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) // FilterSubscribeID_v20beta1 is the current Waku Filter protocol identifier for servers to @@ -146,6 +145,12 @@ func (wf *WakuFilter) ping(s network.Stream, logger *zap.Logger, request *pb.Fil func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { if request.PubsubTopic == "" { reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") + return + } + + if len(request.ContentTopics) == 0 { + reply(s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified") + return } peerID := s.Conn().RemotePeer() @@ -158,6 +163,12 @@ func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *p func (wf *WakuFilter) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { if request.PubsubTopic == "" { reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") + return + } + + if len(request.ContentTopics) == 0 { + reply(s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified") + return } err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics) @@ -186,7 +197,6 @@ func (wf *WakuFilter) filterListener(ctx context.Context) { msg := envelope.Message() pubsubTopic := envelope.PubsubTopic() logger := wf.log.With(logging.HexBytes("envelopeHash", envelope.Hash())) - g := new(errgroup.Group) // Each subscriber is a light node that earlier on invoked // a FilterRequest on this node @@ -195,16 +205,17 @@ func (wf *WakuFilter) filterListener(ctx context.Context) { subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines // Do a message push to light node logger.Info("pushing message to light node") - g.Go(func() (err error) { - err = wf.pushMessage(ctx, subscriber, envelope) + wf.wg.Add(1) + go func(subscriber peer.ID) { + defer wf.wg.Done() + err := wf.pushMessage(ctx, subscriber, envelope) if err != nil { logger.Error("pushing message", zap.Error(err)) } - return err - }) + }(subscriber) } - return g.Wait() + return nil } for m := range wf.msgC { diff --git a/waku/v2/protocol/filterv2/subscribers_map.go b/waku/v2/protocol/filterv2/subscribers_map.go index cebfe30b..bcd23f9a 100644 --- a/waku/v2/protocol/filterv2/subscribers_map.go +++ b/waku/v2/protocol/filterv2/subscribers_map.go @@ -59,14 +59,9 @@ func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics sub.items[peerID] = pubsubTopicMap - if len(contentTopics) == 0 { - // Interested in all messages for a pubsub topic - sub.addToInterestMap(peerID, pubsubTopic, nil) - } else { - for _, c := range contentTopics { - c := c - sub.addToInterestMap(peerID, pubsubTopic, &c) - } + for _, c := range contentTopics { + c := c + sub.addToInterestMap(peerID, pubsubTopic, c) } } @@ -102,29 +97,16 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop return ErrNotFound } - if len(contentTopics) == 0 { - // Remove all content topics related to this pubsub topic - for c := range contentTopicsMap { - c := c - delete(contentTopicsMap, c) - sub.removeFromInterestMap(peerID, pubsubTopic, &c) - } + // Removing content topics individually + for _, c := range contentTopics { + c := c + delete(contentTopicsMap, c) + sub.removeFromInterestMap(peerID, pubsubTopic, c) + } + // No more content topics available. Removing content topic completely + if len(contentTopicsMap) == 0 { delete(pubsubTopicMap, pubsubTopic) - sub.removeFromInterestMap(peerID, pubsubTopic, nil) - } else { - // Removing content topics individually - for _, c := range contentTopics { - c := c - delete(contentTopicsMap, c) - sub.removeFromInterestMap(peerID, pubsubTopic, &c) - } - - // No more content topics available. Removing subscription completely - if len(contentTopicsMap) == 0 { - delete(pubsubTopicMap, pubsubTopic) - sub.removeFromInterestMap(peerID, pubsubTopic, nil) - } } pubsubTopicMap[pubsubTopic] = contentTopicsMap @@ -142,13 +124,8 @@ func (sub *SubscribersMap) deleteAll(peerID peer.ID) error { for pubsubTopic, contentTopicsMap := range pubsubTopicMap { // Remove all content topics related to this pubsub topic for c := range contentTopicsMap { - c := c - delete(contentTopicsMap, c) - sub.removeFromInterestMap(peerID, pubsubTopic, &c) + sub.removeFromInterestMap(peerID, pubsubTopic, c) } - - delete(pubsubTopicMap, pubsubTopic) - sub.removeFromInterestMap(peerID, pubsubTopic, nil) } delete(sub.items, peerID) @@ -167,29 +144,19 @@ func (sub *SubscribersMap) RemoveAll() { sub.Lock() defer sub.Unlock() - for k /*, _ v*/ := range sub.items { - //close(v.Chan) - delete(sub.items, k) - } + sub.items = make(map[peer.ID]PubsubTopics) } func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID { c := make(chan peer.ID) - onlyPubsubTopicKey := getKey(pubsubTopic, nil) - pubsubAndContentTopicKey := getKey(pubsubTopic, &contentTopic) + key := getKey(pubsubTopic, contentTopic) f := func() { sub.RLock() defer sub.RUnlock() - if peers, ok := sub.interestMap[onlyPubsubTopicKey]; ok { - for p := range peers { - c <- p - } - } - - if peers, ok := sub.interestMap[pubsubAndContentTopicKey]; ok { + if peers, ok := sub.interestMap[key]; ok { for p := range peers { c <- p } @@ -201,7 +168,7 @@ func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan return c } -func (sub *SubscribersMap) addToInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) { +func (sub *SubscribersMap) addToInterestMap(peerID peer.ID, pubsubTopic string, contentTopic string) { key := getKey(pubsubTopic, contentTopic) peerSet, ok := sub.interestMap[key] if !ok { @@ -211,7 +178,7 @@ func (sub *SubscribersMap) addToInterestMap(peerID peer.ID, pubsubTopic string, sub.interestMap[key] = peerSet } -func (sub *SubscribersMap) removeFromInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) { +func (sub *SubscribersMap) removeFromInterestMap(peerID peer.ID, pubsubTopic string, contentTopic string) { key := getKey(pubsubTopic, contentTopic) _, exists := sub.interestMap[key] if exists { @@ -219,14 +186,11 @@ func (sub *SubscribersMap) removeFromInterestMap(peerID peer.ID, pubsubTopic str } } -func getKey(pubsubTopic string, contentTopic *string) string { +func getKey(pubsubTopic string, contentTopic string) string { pubsubTopicBytes := []byte(pubsubTopic) - if contentTopic == nil { - return hex.EncodeToString(crypto.Keccak256(pubsubTopicBytes)) - } else { - key := append(pubsubTopicBytes, []byte(*contentTopic)...) - return hex.EncodeToString(crypto.Keccak256(key)) - } + key := append(pubsubTopicBytes, []byte(contentTopic)...) + return hex.EncodeToString(crypto.Keccak256(key)) + } func (sub *SubscribersMap) IsFailedPeer(peerID peer.ID) bool {