fix: pubsub topic and content topics are always required when doing filter requests

This commit is contained in:
Richard Ramos 2023-02-14 11:11:26 -04:00 committed by RichΛrd
parent 52f7c8d86e
commit b816434843
3 changed files with 104 additions and 76 deletions

View File

@ -4,7 +4,9 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt"
"math" "math"
"net/http"
"sync" "sync"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
@ -46,6 +48,11 @@ type ContentFilter struct {
ContentTopics []string ContentTopics []string
} }
type WakuFilterPushResult struct {
err error
peerID peer.ID
}
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options // NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterPush(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterPush { func NewWakuFilterPush(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterPush {
wf := new(WakuFilterPush) wf := new(WakuFilterPush)
@ -141,8 +148,10 @@ func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribePa
if err != nil { if err != nil {
return err return err
} }
defer conn.Close()
writer := protoio.NewDelimitedWriter(conn) writer := protoio.NewDelimitedWriter(conn)
reader := protoio.NewDelimitedReader(conn, math.MaxInt32)
request := &pb.FilterSubscribeRequest{ request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestId), RequestId: hex.EncodeToString(params.requestId),
@ -158,13 +167,29 @@ func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribePa
return err return err
} }
defer conn.Close() filterSubscribeResponse := &pb.FilterSubscribeResponse{}
err = reader.ReadMsg(filterSubscribeResponse)
if err != nil {
wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err))
return err
}
if filterSubscribeResponse.StatusCode != http.StatusOK {
return fmt.Errorf("filter err: %d, %s", filterSubscribeResponse.StatusCode, filterSubscribeResponse.StatusDesc)
}
return nil return nil
} }
// Subscribe setups a subscription to receive messages that match a specific content filter // Subscribe setups a subscription to receive messages that match a specific content filter
func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) error { func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) error {
// TODO: validate content filters if contentFilter.Topic == "" {
return errors.New("topic is required")
}
if len(contentFilter.ContentTopics) == 0 {
return errors.New("at least one content topic is required")
}
params := new(FilterSubscribeParameters) params := new(FilterSubscribeParameters)
params.log = wf.log params.log = wf.log
@ -208,21 +233,31 @@ func (wf *WakuFilterPush) getUnsubscribeParameters(opts ...FilterUnsubscribeOpti
} }
// Unsubscribe is used to stop receiving messages from a peer that match a content filter // Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) error { func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (error, <-chan WakuFilterPushResult) {
// TODO: checks if a subscription exists with the chosen criteria if contentFilter.Topic == "" {
return errors.New("topic is required"), nil
}
if len(contentFilter.ContentTopics) == 0 {
return errors.New("at least one content topic is required"), nil
}
params, err := wf.getUnsubscribeParameters(opts...) params, err := wf.getUnsubscribeParameters(opts...)
if err != nil { if err != nil {
return err return err, nil
} }
localWg := sync.WaitGroup{}
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
for peerID := range wf.subscriptions.items { for peerID := range wf.subscriptions.items {
if !params.unsubscribeAll && peerID != params.selectedPeer { if !params.unsubscribeAll && peerID != params.selectedPeer {
continue continue
} }
localWg.Add(1)
go func(peerID peer.ID) { go func(peerID peer.ID) {
defer wf.wg.Done() defer localWg.Done()
err := wf.request( err := wf.request(
ctx, ctx,
&FilterSubscribeParameters{selectedPeer: peerID}, &FilterSubscribeParameters{selectedPeer: peerID},
@ -231,28 +266,38 @@ func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter Content
if err != nil { if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
} }
resultChan <- WakuFilterPushResult{
err: err,
peerID: peerID,
}
}(peerID) }(peerID)
} }
return nil localWg.Wait()
close(resultChan)
return nil, resultChan
} }
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) error { func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
params, err := wf.getUnsubscribeParameters(opts...) params, err := wf.getUnsubscribeParameters(opts...)
if err != nil { if err != nil {
return err return nil, err
} }
wf.subscriptions.Lock() wf.subscriptions.Lock()
defer wf.subscriptions.Unlock() defer wf.subscriptions.Unlock()
wf.wg.Add(len(wf.subscriptions.items)) localWg := sync.WaitGroup{}
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
for peerID := range wf.subscriptions.items { for peerID := range wf.subscriptions.items {
if !params.unsubscribeAll && peerID != params.selectedPeer { if !params.unsubscribeAll && peerID != params.selectedPeer {
continue continue
} }
localWg.Add(1)
go func(peerID peer.ID) { go func(peerID peer.ID) {
defer wf.wg.Done() defer wf.wg.Done()
err := wf.request( err := wf.request(
@ -263,8 +308,16 @@ func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsu
if err != nil { if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
} }
resultChan <- WakuFilterPushResult{
err: err,
peerID: peerID,
}
}(peerID) }(peerID)
} }
return nil localWg.Wait()
close(resultChan)
return resultChan, nil
} }

View File

@ -21,7 +21,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
// FilterSubscribeID_v20beta1 is the current Waku Filter protocol identifier for servers to // FilterSubscribeID_v20beta1 is the current Waku Filter protocol identifier for servers to
@ -146,6 +145,12 @@ func (wf *WakuFilter) ping(s network.Stream, logger *zap.Logger, request *pb.Fil
func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == "" { if request.PubsubTopic == "" {
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}
if len(request.ContentTopics) == 0 {
reply(s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
} }
peerID := s.Conn().RemotePeer() peerID := s.Conn().RemotePeer()
@ -158,6 +163,12 @@ func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *p
func (wf *WakuFilter) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { func (wf *WakuFilter) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == "" { if request.PubsubTopic == "" {
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty") reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}
if len(request.ContentTopics) == 0 {
reply(s, logger, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
} }
err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics) err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics)
@ -186,7 +197,6 @@ func (wf *WakuFilter) filterListener(ctx context.Context) {
msg := envelope.Message() msg := envelope.Message()
pubsubTopic := envelope.PubsubTopic() pubsubTopic := envelope.PubsubTopic()
logger := wf.log.With(logging.HexBytes("envelopeHash", envelope.Hash())) logger := wf.log.With(logging.HexBytes("envelopeHash", envelope.Hash()))
g := new(errgroup.Group)
// Each subscriber is a light node that earlier on invoked // Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node // a FilterRequest on this node
@ -195,16 +205,17 @@ func (wf *WakuFilter) filterListener(ctx context.Context) {
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
// Do a message push to light node // Do a message push to light node
logger.Info("pushing message to light node") logger.Info("pushing message to light node")
g.Go(func() (err error) { wf.wg.Add(1)
err = wf.pushMessage(ctx, subscriber, envelope) go func(subscriber peer.ID) {
defer wf.wg.Done()
err := wf.pushMessage(ctx, subscriber, envelope)
if err != nil { if err != nil {
logger.Error("pushing message", zap.Error(err)) logger.Error("pushing message", zap.Error(err))
} }
return err }(subscriber)
})
} }
return g.Wait() return nil
} }
for m := range wf.msgC { for m := range wf.msgC {

View File

@ -59,14 +59,9 @@ func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics
sub.items[peerID] = pubsubTopicMap sub.items[peerID] = pubsubTopicMap
if len(contentTopics) == 0 {
// Interested in all messages for a pubsub topic
sub.addToInterestMap(peerID, pubsubTopic, nil)
} else {
for _, c := range contentTopics { for _, c := range contentTopics {
c := c c := c
sub.addToInterestMap(peerID, pubsubTopic, &c) sub.addToInterestMap(peerID, pubsubTopic, c)
}
} }
} }
@ -102,29 +97,16 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop
return ErrNotFound return ErrNotFound
} }
if len(contentTopics) == 0 {
// Remove all content topics related to this pubsub topic
for c := range contentTopicsMap {
c := c
delete(contentTopicsMap, c)
sub.removeFromInterestMap(peerID, pubsubTopic, &c)
}
delete(pubsubTopicMap, pubsubTopic)
sub.removeFromInterestMap(peerID, pubsubTopic, nil)
} else {
// Removing content topics individually // Removing content topics individually
for _, c := range contentTopics { for _, c := range contentTopics {
c := c c := c
delete(contentTopicsMap, c) delete(contentTopicsMap, c)
sub.removeFromInterestMap(peerID, pubsubTopic, &c) sub.removeFromInterestMap(peerID, pubsubTopic, c)
} }
// No more content topics available. Removing subscription completely // No more content topics available. Removing content topic completely
if len(contentTopicsMap) == 0 { if len(contentTopicsMap) == 0 {
delete(pubsubTopicMap, pubsubTopic) delete(pubsubTopicMap, pubsubTopic)
sub.removeFromInterestMap(peerID, pubsubTopic, nil)
}
} }
pubsubTopicMap[pubsubTopic] = contentTopicsMap pubsubTopicMap[pubsubTopic] = contentTopicsMap
@ -142,13 +124,8 @@ func (sub *SubscribersMap) deleteAll(peerID peer.ID) error {
for pubsubTopic, contentTopicsMap := range pubsubTopicMap { for pubsubTopic, contentTopicsMap := range pubsubTopicMap {
// Remove all content topics related to this pubsub topic // Remove all content topics related to this pubsub topic
for c := range contentTopicsMap { for c := range contentTopicsMap {
c := c sub.removeFromInterestMap(peerID, pubsubTopic, c)
delete(contentTopicsMap, c)
sub.removeFromInterestMap(peerID, pubsubTopic, &c)
} }
delete(pubsubTopicMap, pubsubTopic)
sub.removeFromInterestMap(peerID, pubsubTopic, nil)
} }
delete(sub.items, peerID) delete(sub.items, peerID)
@ -167,29 +144,19 @@ func (sub *SubscribersMap) RemoveAll() {
sub.Lock() sub.Lock()
defer sub.Unlock() defer sub.Unlock()
for k /*, _ v*/ := range sub.items { sub.items = make(map[peer.ID]PubsubTopics)
//close(v.Chan)
delete(sub.items, k)
}
} }
func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID { func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID {
c := make(chan peer.ID) c := make(chan peer.ID)
onlyPubsubTopicKey := getKey(pubsubTopic, nil) key := getKey(pubsubTopic, contentTopic)
pubsubAndContentTopicKey := getKey(pubsubTopic, &contentTopic)
f := func() { f := func() {
sub.RLock() sub.RLock()
defer sub.RUnlock() defer sub.RUnlock()
if peers, ok := sub.interestMap[onlyPubsubTopicKey]; ok { if peers, ok := sub.interestMap[key]; ok {
for p := range peers {
c <- p
}
}
if peers, ok := sub.interestMap[pubsubAndContentTopicKey]; ok {
for p := range peers { for p := range peers {
c <- p c <- p
} }
@ -201,7 +168,7 @@ func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan
return c return c
} }
func (sub *SubscribersMap) addToInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) { func (sub *SubscribersMap) addToInterestMap(peerID peer.ID, pubsubTopic string, contentTopic string) {
key := getKey(pubsubTopic, contentTopic) key := getKey(pubsubTopic, contentTopic)
peerSet, ok := sub.interestMap[key] peerSet, ok := sub.interestMap[key]
if !ok { if !ok {
@ -211,7 +178,7 @@ func (sub *SubscribersMap) addToInterestMap(peerID peer.ID, pubsubTopic string,
sub.interestMap[key] = peerSet sub.interestMap[key] = peerSet
} }
func (sub *SubscribersMap) removeFromInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) { func (sub *SubscribersMap) removeFromInterestMap(peerID peer.ID, pubsubTopic string, contentTopic string) {
key := getKey(pubsubTopic, contentTopic) key := getKey(pubsubTopic, contentTopic)
_, exists := sub.interestMap[key] _, exists := sub.interestMap[key]
if exists { if exists {
@ -219,14 +186,11 @@ func (sub *SubscribersMap) removeFromInterestMap(peerID peer.ID, pubsubTopic str
} }
} }
func getKey(pubsubTopic string, contentTopic *string) string { func getKey(pubsubTopic string, contentTopic string) string {
pubsubTopicBytes := []byte(pubsubTopic) pubsubTopicBytes := []byte(pubsubTopic)
if contentTopic == nil { key := append(pubsubTopicBytes, []byte(contentTopic)...)
return hex.EncodeToString(crypto.Keccak256(pubsubTopicBytes))
} else {
key := append(pubsubTopicBytes, []byte(*contentTopic)...)
return hex.EncodeToString(crypto.Keccak256(key)) return hex.EncodeToString(crypto.Keccak256(key))
}
} }
func (sub *SubscribersMap) IsFailedPeer(peerID peer.ID) bool { func (sub *SubscribersMap) IsFailedPeer(peerID peer.ID) bool {