From 279524f10078c9cf50b12a7459a39c209aeecf54 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 8 May 2023 17:33:10 -0400 Subject: [PATCH] feat(filter): get subscription list --- README.md | 10 +++ library/README.md | 2 +- waku/v2/protocol/filter/client.go | 27 +++++-- waku/v2/protocol/filter/subscriptions_map.go | 76 ++++++++++--------- .../protocol/filter/subscriptions_map_test.go | 33 ++++---- 5 files changed, 91 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 23b77c5e..f26a8c98 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,16 @@ nix develop ``` #### Docker +``` +docker run -i -t -p 60000:60000 -p 9000:9000/udp \ + statusteam/go-waku:v0.5.2 \ # or, the image:tag of your choice + --dns-discovery:true \ + --dns-discovery-url:enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im \ + --discv5-discovery +``` + +or build and run the image with: + ``` docker build -t go-waku:latest . diff --git a/library/README.md b/library/README.md index cf0abb47..0511cf0a 100644 --- a/library/README.md +++ b/library/README.md @@ -1183,7 +1183,7 @@ Returns a list of multiaddress and enrs given a url to a DNS discoverable ENR tr **Returns** A [`JsonResponse`](#jsonresponse-type). -If the execution is successful, the `result` field contains an array objects describing the multiaddresses, enr and peerID each node found has. +If the execution is successful, the `result` field contains an array objects describing the multiaddresses, enr and peerID each node found. An `error` message otherwise. ```json diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index d85d8d1e..15922d10 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -82,7 +82,7 @@ func (wf *WakuFilterLightnode) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) wf.cancel = cancel wf.ctx = ctx - wf.subscriptions = NewSubscriptionMap() + wf.subscriptions = NewSubscriptionMap(wf.log) wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx)) @@ -279,7 +279,24 @@ func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error { } func (wf *WakuFilterLightnode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { - return wf.Ping(ctx, subscription.peerID) + return wf.Ping(ctx, subscription.PeerID) +} + +func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails { + wf.subscriptions.RLock() + defer wf.subscriptions.RUnlock() + + var output []*SubscriptionDetails + + for _, peerSubscription := range wf.subscriptions.items { + for _, subscriptionPerTopic := range peerSubscription.subscriptionsPerTopic { + for _, subscriptionDetail := range subscriptionPerTopic { + output = append(output, subscriptionDetail) + } + } + } + + return output } // Unsubscribe is used to stop receiving messages from a peer that match a content filter @@ -337,13 +354,13 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { var contentTopics []string - for k := range sub.contentTopics { + for k := range sub.ContentTopics { contentTopics = append(contentTopics, k) } - opts = append(opts, Peer(sub.peerID)) + opts = append(opts, Peer(sub.PeerID)) - return wf.Unsubscribe(ctx, ContentFilter{Topic: sub.pubsubTopic, ContentTopics: contentTopics}, opts...) + return wf.Unsubscribe(ctx, ContentFilter{Topic: sub.PubsubTopic, ContentTopics: contentTopics}, opts...) } // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions diff --git a/waku/v2/protocol/filter/subscriptions_map.go b/waku/v2/protocol/filter/subscriptions_map.go index c0afbbcb..8ab0e35c 100644 --- a/waku/v2/protocol/filter/subscriptions_map.go +++ b/waku/v2/protocol/filter/subscriptions_map.go @@ -6,19 +6,20 @@ import ( "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" + "go.uber.org/zap" ) type SubscriptionDetails struct { sync.RWMutex - id string + ID string mapRef *SubscriptionsMap - closed bool + Closed bool once sync.Once - peerID peer.ID - pubsubTopic string - contentTopics map[string]struct{} + PeerID peer.ID + PubsubTopic string + ContentTopics map[string]struct{} C chan *protocol.Envelope } @@ -31,12 +32,14 @@ type PeerSubscription struct { type SubscriptionsMap struct { sync.RWMutex - items map[peer.ID]*PeerSubscription + logger *zap.Logger + items map[peer.ID]*PeerSubscription } -func NewSubscriptionMap() *SubscriptionsMap { +func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap { return &SubscriptionsMap{ - items: make(map[peer.ID]*PeerSubscription), + logger: logger.Named("subscription-map"), + items: make(map[peer.ID]*PeerSubscription), } } @@ -59,19 +62,19 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, topic string, conte } details := &SubscriptionDetails{ - id: uuid.NewString(), + ID: uuid.NewString(), mapRef: sub, - peerID: peerID, - pubsubTopic: topic, - C: make(chan *protocol.Envelope), - contentTopics: make(map[string]struct{}), + PeerID: peerID, + PubsubTopic: topic, + C: make(chan *protocol.Envelope, 1024), + ContentTopics: make(map[string]struct{}), } for _, ct := range contentTopics { - details.contentTopics[ct] = struct{}{} + details.ContentTopics[ct] = struct{}{} } - sub.items[peerID].subscriptionsPerTopic[topic][details.id] = details + sub.items[peerID].subscriptionsPerTopic[topic][details.ID] = details return details } @@ -104,7 +107,7 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics ... for _, ct := range contentTopics { found := false for _, subscription := range subscriptions { - _, exists := subscription.contentTopics[ct] + _, exists := subscription.ContentTopics[ct] if exists { found = true break @@ -122,12 +125,12 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { sub.Lock() defer sub.Unlock() - peerSubscription, ok := sub.items[subscription.peerID] + peerSubscription, ok := sub.items[subscription.PeerID] if !ok { return ErrNotFound } - delete(peerSubscription.subscriptionsPerTopic[subscription.pubsubTopic], subscription.id) + delete(peerSubscription.subscriptionsPerTopic[subscription.PubsubTopic], subscription.ID) return nil } @@ -137,7 +140,7 @@ func (s *SubscriptionDetails) Add(contentTopics ...string) { defer s.Unlock() for _, ct := range contentTopics { - s.contentTopics[ct] = struct{}{} + s.ContentTopics[ct] = struct{}{} } } @@ -146,7 +149,7 @@ func (s *SubscriptionDetails) Remove(contentTopics ...string) { defer s.Unlock() for _, ct := range contentTopics { - delete(s.contentTopics, ct) + delete(s.ContentTopics, ct) } } @@ -155,7 +158,7 @@ func (s *SubscriptionDetails) closeC() { s.Lock() defer s.Unlock() - s.closed = true + s.Closed = true close(s.C) }) } @@ -170,17 +173,17 @@ func (s *SubscriptionDetails) Clone() *SubscriptionDetails { defer s.RUnlock() result := &SubscriptionDetails{ - id: uuid.NewString(), + ID: uuid.NewString(), mapRef: s.mapRef, - closed: false, - peerID: s.peerID, - pubsubTopic: s.pubsubTopic, - contentTopics: make(map[string]struct{}), + Closed: false, + PeerID: s.PeerID, + PubsubTopic: s.PubsubTopic, + ContentTopics: make(map[string]struct{}), C: make(chan *protocol.Envelope), } - for k := range s.contentTopics { - result.contentTopics[k] = struct{}{} + for k := range s.ContentTopics { + result.ContentTopics[k] = struct{}{} } return result @@ -210,24 +213,27 @@ func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) subscriptions, ok := sub.items[peerID].subscriptionsPerTopic[envelope.PubsubTopic()] if ok { - iterateSubscriptionSet(subscriptions, envelope) + iterateSubscriptionSet(sub.logger, subscriptions, envelope) } } -func iterateSubscriptionSet(subscriptions SubscriptionSet, envelope *protocol.Envelope) { +func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) { for _, subscription := range subscriptions { func(subscription *SubscriptionDetails) { subscription.RLock() defer subscription.RUnlock() - _, ok := subscription.contentTopics[envelope.Message().ContentTopic] - if !ok && len(subscription.contentTopics) != 0 { // TODO: confirm if no content topics are allowed + _, ok := subscription.ContentTopics[envelope.Message().ContentTopic] + if !ok && len(subscription.ContentTopics) != 0 { return } - if !subscription.closed { - // TODO: consider pushing or dropping if subscription is not available - subscription.C <- envelope + if !subscription.Closed { + select { + case subscription.C <- envelope: + default: + logger.Warn("can't deliver message to subscription. subscriber too slow") + } } }(subscription) } diff --git a/waku/v2/protocol/filter/subscriptions_map_test.go b/waku/v2/protocol/filter/subscriptions_map_test.go index c711b826..32b85f85 100644 --- a/waku/v2/protocol/filter/subscriptions_map_test.go +++ b/waku/v2/protocol/filter/subscriptions_map_test.go @@ -9,40 +9,41 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" ) func TestSubscriptionMapAppend(t *testing.T) { - fmap := NewSubscriptionMap() + fmap := NewSubscriptionMap(utils.Logger()) peerId := createPeerId(t) contentTopics := []string{"ct1", "ct2"} sub := fmap.NewSubscription(peerId, TOPIC, contentTopics) - _, found := sub.contentTopics["ct1"] + _, found := sub.ContentTopics["ct1"] require.True(t, found) - _, found = sub.contentTopics["ct2"] + _, found = sub.ContentTopics["ct2"] require.True(t, found) - require.False(t, sub.closed) - require.Equal(t, sub.peerID, peerId) - require.Equal(t, sub.pubsubTopic, TOPIC) + require.False(t, sub.Closed) + require.Equal(t, sub.PeerID, peerId) + require.Equal(t, sub.PubsubTopic, TOPIC) sub.Add("ct3") - _, found = sub.contentTopics["ct3"] + _, found = sub.ContentTopics["ct3"] require.True(t, found) sub.Remove("ct3") - _, found = sub.contentTopics["ct3"] + _, found = sub.ContentTopics["ct3"] require.False(t, found) err := sub.Close() require.NoError(t, err) - require.True(t, sub.closed) + require.True(t, sub.Closed) err = sub.Close() require.NoError(t, err) } func TestSubscriptionClear(t *testing.T) { - fmap := NewSubscriptionMap() + fmap := NewSubscriptionMap(utils.Logger()) contentTopics := []string{"ct1", "ct2"} var subscriptions = []*SubscriptionDetails{ @@ -73,13 +74,13 @@ func TestSubscriptionClear(t *testing.T) { wg.Wait() - require.True(t, subscriptions[0].closed) - require.True(t, subscriptions[1].closed) - require.True(t, subscriptions[2].closed) + require.True(t, subscriptions[0].Closed) + require.True(t, subscriptions[1].Closed) + require.True(t, subscriptions[2].Closed) } func TestSubscriptionsNotify(t *testing.T) { - fmap := NewSubscriptionMap() + fmap := NewSubscriptionMap(utils.Logger()) p1 := createPeerId(t) p2 := createPeerId(t) var subscriptions = []*SubscriptionDetails{ @@ -95,7 +96,7 @@ func TestSubscriptionsNotify(t *testing.T) { successOnReceive := func(ctx context.Context, i int) { defer wg.Done() - if subscriptions[i].closed { + if subscriptions[i].Closed { successChan <- struct{}{} return } @@ -115,7 +116,7 @@ func TestSubscriptionsNotify(t *testing.T) { failOnReceive := func(ctx context.Context, i int) { defer wg.Done() - if subscriptions[i].closed { + if subscriptions[i].Closed { successChan <- struct{}{} return }