From 63bb4509bf76398b4389254ca347f479dca40768 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 8 Mar 2023 11:58:51 -0400 Subject: [PATCH] feat(filterv2): ping --- flake.nix | 2 +- waku/v2/protocol/filterv2/client.go | 29 ++++++++++---- waku/v2/protocol/filterv2/common.go | 18 +++++++++ waku/v2/protocol/filterv2/filter_test.go | 40 +++++++++++++++++++ .../v2/protocol/filterv2/subscriptions_map.go | 33 ++++++++++++++- 5 files changed, 112 insertions(+), 10 deletions(-) diff --git a/flake.nix b/flake.nix index 0bbe7e11..049c98a5 100644 --- a/flake.nix +++ b/flake.nix @@ -25,7 +25,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorSha256 = "sha256-TvQfLQEYDujfXInQ+i/LoSGtedozZvX8WgzpqiryYHY="; + vendorSha256 = "sha256-yED55+XK/JVplsVcZQin2RBECIUt3XMr0crwt+X2S2Q="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/waku/v2/protocol/filterv2/client.go b/waku/v2/protocol/filterv2/client.go index d792da9c..e3d444c4 100644 --- a/waku/v2/protocol/filterv2/client.go +++ b/waku/v2/protocol/filterv2/client.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "errors" - "fmt" "math" "net/http" "sync" @@ -82,9 +81,6 @@ func (wf *WakuFilterLightnode) Start(ctx context.Context) error { wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx)) - // wf.wg.Add(1) - // TODO: go wf.keepAliveSubscriptions(ctx) - wf.log.Info("filter protocol (light) started") return nil @@ -175,7 +171,8 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr } if filterSubscribeResponse.StatusCode != http.StatusOK { - return fmt.Errorf("filter err: %d, %s", filterSubscribeResponse.StatusCode, filterSubscribeResponse.StatusDesc) + err := NewFilterError(int(filterSubscribeResponse.StatusCode), filterSubscribeResponse.StatusDesc) + return &err } return nil @@ -210,12 +207,16 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont return nil, err } - return wf.FilterSubscription(params.selectedPeer, contentFilter), nil + return wf.subscriptions.NewSubscription(params.selectedPeer, contentFilter.Topic, contentFilter.ContentTopics), nil } // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol -func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) *SubscriptionDetails { - return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics) +func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { + if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics) { + return nil, errors.New("subscription does not exist") + } + + return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics), nil } func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { @@ -232,6 +233,18 @@ func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscrib return params, nil } +func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error { + return wf.request( + ctx, + &FilterSubscribeParameters{selectedPeer: peerID}, + pb.FilterSubscribeRequest_SUBSCRIBER_PING, + ContentFilter{}) +} + +func (wf *WakuFilterLightnode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { + return wf.Ping(ctx, subscription.peerID) +} + // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { if contentFilter.Topic == "" { diff --git a/waku/v2/protocol/filterv2/common.go b/waku/v2/protocol/filterv2/common.go index 50046f63..b087fd9a 100644 --- a/waku/v2/protocol/filterv2/common.go +++ b/waku/v2/protocol/filterv2/common.go @@ -1,4 +1,22 @@ package filterv2 +import "fmt" + const DefaultMaxSubscriptions = 1000 const MaxCriteriaPerSubscription = 1000 + +type FilterError struct { + Code int + Message string +} + +func NewFilterError(code int, message string) FilterError { + return FilterError{ + Code: code, + Message: message, + } +} + +func (e *FilterError) Error() string { + return fmt.Sprintf("error %d: %s", e.Code, e.Message) +} diff --git a/waku/v2/protocol/filterv2/filter_test.go b/waku/v2/protocol/filterv2/filter_test.go index 66acdab9..be39454c 100644 --- a/waku/v2/protocol/filterv2/filter_test.go +++ b/waku/v2/protocol/filterv2/filter_test.go @@ -3,6 +3,7 @@ package filterv2 import ( "context" "crypto/rand" + "net/http" "sync" "testing" "time" @@ -147,6 +148,45 @@ func TestWakuFilter(t *testing.T) { wg.Wait() } +func TestSubscriptionPing(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + defer cancel() + + testTopic := "/waku/2/go/filter/test" + + node1, host1 := makeWakuFilterLightNode(t) + defer node1.Stop() + + broadcaster := v2.NewBroadcaster(10) + node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) + defer node2.Stop() + defer sub2.Unsubscribe() + + node2Filter := NewWakuFilterFullnode(host2, broadcaster, timesource.NewDefaultClock(), utils.Logger()) + err := node2Filter.Start(ctx) + require.NoError(t, err) + + host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err = host1.Peerstore().AddProtocols(host2.ID(), FilterSubscribeID_v20beta1) + require.NoError(t, err) + + err = node1.Ping(context.Background(), host2.ID()) + require.Error(t, err) + filterErr, ok := err.(*FilterError) + require.True(t, ok) + require.Equal(t, filterErr.Code, http.StatusNotFound) + + contentFilter := ContentFilter{ + Topic: string(testTopic), + ContentTopics: []string{"abc"}, + } + _, err = node1.Subscribe(ctx, contentFilter, WithPeer(node2Filter.h.ID())) + require.NoError(t, err) + + err = node1.Ping(context.Background(), host2.ID()) + require.NoError(t, err) +} + func TestWakuFilterPeerFailure(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds defer cancel() diff --git a/waku/v2/protocol/filterv2/subscriptions_map.go b/waku/v2/protocol/filterv2/subscriptions_map.go index b7b10ab1..dbc95092 100644 --- a/waku/v2/protocol/filterv2/subscriptions_map.go +++ b/waku/v2/protocol/filterv2/subscriptions_map.go @@ -76,6 +76,37 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, topic string, conte return details } +func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics []string) bool { + // Check if peer exits + peerSubscription, ok := sub.items[peerID] + if !ok { + return false + } + + // Check if pubsub topic exists + subscriptions, ok := peerSubscription.subscriptionsPerTopic[topic] + if !ok { + return false + } + + // Check if the content topic exists within the list of subscriptions for this peer + for _, ct := range contentTopics { + found := false + for _, subscription := range subscriptions { + _, exists := subscription.contentTopics[ct] + if exists { + found = true + break + } + } + if !found { + return false + } + } + + return true +} + func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { sub.Lock() defer sub.Unlock() @@ -116,7 +147,6 @@ func (s *SubscriptionDetails) closeC() { s.closed = true close(s.C) }) - } func (s *SubscriptionDetails) Close() error { @@ -185,6 +215,7 @@ func iterateSubscriptionSet(subscriptions SubscriptionSet, envelope *protocol.En } if !subscription.closed { + // TODO: consider pushing or dropping if subscription is not available subscription.C <- envelope } }(subscription)