feat(filter): get subscription list

This commit is contained in:
Richard Ramos 2023-05-08 17:33:10 -04:00 committed by RichΛrd
parent 54afef6164
commit 279524f100
5 changed files with 91 additions and 57 deletions

View File

@ -37,6 +37,16 @@ nix develop
```
#### Docker
```
docker run -i -t -p 60000:60000 -p 9000:9000/udp \
statusteam/go-waku:v0.5.2 \ # or, the image:tag of your choice
--dns-discovery:true \
--dns-discovery-url:enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im \
--discv5-discovery
```
or build and run the image with:
```
docker build -t go-waku:latest .

View File

@ -1183,7 +1183,7 @@ Returns a list of multiaddress and enrs given a url to a DNS discoverable ENR tr
**Returns**
A [`JsonResponse`](#jsonresponse-type).
If the execution is successful, the `result` field contains an array objects describing the multiaddresses, enr and peerID each node found has.
If the execution is successful, the `result` field contains an array objects describing the multiaddresses, enr and peerID each node found.
An `error` message otherwise.
```json

View File

@ -82,7 +82,7 @@ func (wf *WakuFilterLightnode) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
wf.cancel = cancel
wf.ctx = ctx
wf.subscriptions = NewSubscriptionMap()
wf.subscriptions = NewSubscriptionMap(wf.log)
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx))
@ -279,7 +279,24 @@ func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error {
}
func (wf *WakuFilterLightnode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error {
return wf.Ping(ctx, subscription.peerID)
return wf.Ping(ctx, subscription.PeerID)
}
func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails {
wf.subscriptions.RLock()
defer wf.subscriptions.RUnlock()
var output []*SubscriptionDetails
for _, peerSubscription := range wf.subscriptions.items {
for _, subscriptionPerTopic := range peerSubscription.subscriptionsPerTopic {
for _, subscriptionDetail := range subscriptionPerTopic {
output = append(output, subscriptionDetail)
}
}
}
return output
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
@ -337,13 +354,13 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
var contentTopics []string
for k := range sub.contentTopics {
for k := range sub.ContentTopics {
contentTopics = append(contentTopics, k)
}
opts = append(opts, Peer(sub.peerID))
opts = append(opts, Peer(sub.PeerID))
return wf.Unsubscribe(ctx, ContentFilter{Topic: sub.pubsubTopic, ContentTopics: contentTopics}, opts...)
return wf.Unsubscribe(ctx, ContentFilter{Topic: sub.PubsubTopic, ContentTopics: contentTopics}, opts...)
}
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions

View File

@ -6,19 +6,20 @@ import (
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
)
type SubscriptionDetails struct {
sync.RWMutex
id string
ID string
mapRef *SubscriptionsMap
closed bool
Closed bool
once sync.Once
peerID peer.ID
pubsubTopic string
contentTopics map[string]struct{}
PeerID peer.ID
PubsubTopic string
ContentTopics map[string]struct{}
C chan *protocol.Envelope
}
@ -31,12 +32,14 @@ type PeerSubscription struct {
type SubscriptionsMap struct {
sync.RWMutex
items map[peer.ID]*PeerSubscription
logger *zap.Logger
items map[peer.ID]*PeerSubscription
}
func NewSubscriptionMap() *SubscriptionsMap {
func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap {
return &SubscriptionsMap{
items: make(map[peer.ID]*PeerSubscription),
logger: logger.Named("subscription-map"),
items: make(map[peer.ID]*PeerSubscription),
}
}
@ -59,19 +62,19 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, topic string, conte
}
details := &SubscriptionDetails{
id: uuid.NewString(),
ID: uuid.NewString(),
mapRef: sub,
peerID: peerID,
pubsubTopic: topic,
C: make(chan *protocol.Envelope),
contentTopics: make(map[string]struct{}),
PeerID: peerID,
PubsubTopic: topic,
C: make(chan *protocol.Envelope, 1024),
ContentTopics: make(map[string]struct{}),
}
for _, ct := range contentTopics {
details.contentTopics[ct] = struct{}{}
details.ContentTopics[ct] = struct{}{}
}
sub.items[peerID].subscriptionsPerTopic[topic][details.id] = details
sub.items[peerID].subscriptionsPerTopic[topic][details.ID] = details
return details
}
@ -104,7 +107,7 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics ...
for _, ct := range contentTopics {
found := false
for _, subscription := range subscriptions {
_, exists := subscription.contentTopics[ct]
_, exists := subscription.ContentTopics[ct]
if exists {
found = true
break
@ -122,12 +125,12 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
sub.Lock()
defer sub.Unlock()
peerSubscription, ok := sub.items[subscription.peerID]
peerSubscription, ok := sub.items[subscription.PeerID]
if !ok {
return ErrNotFound
}
delete(peerSubscription.subscriptionsPerTopic[subscription.pubsubTopic], subscription.id)
delete(peerSubscription.subscriptionsPerTopic[subscription.PubsubTopic], subscription.ID)
return nil
}
@ -137,7 +140,7 @@ func (s *SubscriptionDetails) Add(contentTopics ...string) {
defer s.Unlock()
for _, ct := range contentTopics {
s.contentTopics[ct] = struct{}{}
s.ContentTopics[ct] = struct{}{}
}
}
@ -146,7 +149,7 @@ func (s *SubscriptionDetails) Remove(contentTopics ...string) {
defer s.Unlock()
for _, ct := range contentTopics {
delete(s.contentTopics, ct)
delete(s.ContentTopics, ct)
}
}
@ -155,7 +158,7 @@ func (s *SubscriptionDetails) closeC() {
s.Lock()
defer s.Unlock()
s.closed = true
s.Closed = true
close(s.C)
})
}
@ -170,17 +173,17 @@ func (s *SubscriptionDetails) Clone() *SubscriptionDetails {
defer s.RUnlock()
result := &SubscriptionDetails{
id: uuid.NewString(),
ID: uuid.NewString(),
mapRef: s.mapRef,
closed: false,
peerID: s.peerID,
pubsubTopic: s.pubsubTopic,
contentTopics: make(map[string]struct{}),
Closed: false,
PeerID: s.PeerID,
PubsubTopic: s.PubsubTopic,
ContentTopics: make(map[string]struct{}),
C: make(chan *protocol.Envelope),
}
for k := range s.contentTopics {
result.contentTopics[k] = struct{}{}
for k := range s.ContentTopics {
result.ContentTopics[k] = struct{}{}
}
return result
@ -210,24 +213,27 @@ func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope)
subscriptions, ok := sub.items[peerID].subscriptionsPerTopic[envelope.PubsubTopic()]
if ok {
iterateSubscriptionSet(subscriptions, envelope)
iterateSubscriptionSet(sub.logger, subscriptions, envelope)
}
}
func iterateSubscriptionSet(subscriptions SubscriptionSet, envelope *protocol.Envelope) {
func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) {
for _, subscription := range subscriptions {
func(subscription *SubscriptionDetails) {
subscription.RLock()
defer subscription.RUnlock()
_, ok := subscription.contentTopics[envelope.Message().ContentTopic]
if !ok && len(subscription.contentTopics) != 0 { // TODO: confirm if no content topics are allowed
_, ok := subscription.ContentTopics[envelope.Message().ContentTopic]
if !ok && len(subscription.ContentTopics) != 0 {
return
}
if !subscription.closed {
// TODO: consider pushing or dropping if subscription is not available
subscription.C <- envelope
if !subscription.Closed {
select {
case subscription.C <- envelope:
default:
logger.Warn("can't deliver message to subscription. subscriber too slow")
}
}
}(subscription)
}

View File

@ -9,40 +9,41 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func TestSubscriptionMapAppend(t *testing.T) {
fmap := NewSubscriptionMap()
fmap := NewSubscriptionMap(utils.Logger())
peerId := createPeerId(t)
contentTopics := []string{"ct1", "ct2"}
sub := fmap.NewSubscription(peerId, TOPIC, contentTopics)
_, found := sub.contentTopics["ct1"]
_, found := sub.ContentTopics["ct1"]
require.True(t, found)
_, found = sub.contentTopics["ct2"]
_, found = sub.ContentTopics["ct2"]
require.True(t, found)
require.False(t, sub.closed)
require.Equal(t, sub.peerID, peerId)
require.Equal(t, sub.pubsubTopic, TOPIC)
require.False(t, sub.Closed)
require.Equal(t, sub.PeerID, peerId)
require.Equal(t, sub.PubsubTopic, TOPIC)
sub.Add("ct3")
_, found = sub.contentTopics["ct3"]
_, found = sub.ContentTopics["ct3"]
require.True(t, found)
sub.Remove("ct3")
_, found = sub.contentTopics["ct3"]
_, found = sub.ContentTopics["ct3"]
require.False(t, found)
err := sub.Close()
require.NoError(t, err)
require.True(t, sub.closed)
require.True(t, sub.Closed)
err = sub.Close()
require.NoError(t, err)
}
func TestSubscriptionClear(t *testing.T) {
fmap := NewSubscriptionMap()
fmap := NewSubscriptionMap(utils.Logger())
contentTopics := []string{"ct1", "ct2"}
var subscriptions = []*SubscriptionDetails{
@ -73,13 +74,13 @@ func TestSubscriptionClear(t *testing.T) {
wg.Wait()
require.True(t, subscriptions[0].closed)
require.True(t, subscriptions[1].closed)
require.True(t, subscriptions[2].closed)
require.True(t, subscriptions[0].Closed)
require.True(t, subscriptions[1].Closed)
require.True(t, subscriptions[2].Closed)
}
func TestSubscriptionsNotify(t *testing.T) {
fmap := NewSubscriptionMap()
fmap := NewSubscriptionMap(utils.Logger())
p1 := createPeerId(t)
p2 := createPeerId(t)
var subscriptions = []*SubscriptionDetails{
@ -95,7 +96,7 @@ func TestSubscriptionsNotify(t *testing.T) {
successOnReceive := func(ctx context.Context, i int) {
defer wg.Done()
if subscriptions[i].closed {
if subscriptions[i].Closed {
successChan <- struct{}{}
return
}
@ -115,7 +116,7 @@ func TestSubscriptionsNotify(t *testing.T) {
failOnReceive := func(ctx context.Context, i int) {
defer wg.Done()
if subscriptions[i].closed {
if subscriptions[i].Closed {
successChan <- struct{}{}
return
}