mirror of https://github.com/status-im/go-waku.git
feat: change UnsubscribeWithSubscription so that it's single sub-specific
Also merge FilterSubscribe and FilterUnsubscribe options/params
This commit is contained in:
parent
e0ba66791d
commit
16ec22596e
|
@ -134,13 +134,13 @@ func FilterUnsubscribe(filterJSON string, peerID string, ms int) error {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
|
|
||||||
var fOptions []filter.FilterUnsubscribeOption
|
var fOptions []filter.FilterSubscribeOption
|
||||||
if peerID != "" {
|
if peerID != "" {
|
||||||
p, err := peer.Decode(peerID)
|
p, err := peer.Decode(peerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fOptions = append(fOptions, filter.Peer(p))
|
fOptions = append(fOptions, filter.WithPeer(p))
|
||||||
} else {
|
} else {
|
||||||
return errors.New("peerID is required")
|
return errors.New("peerID is required")
|
||||||
}
|
}
|
||||||
|
@ -176,13 +176,13 @@ func FilterUnsubscribeAll(peerID string, ms int) (string, error) {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
|
|
||||||
var fOptions []filter.FilterUnsubscribeOption
|
var fOptions []filter.FilterSubscribeOption
|
||||||
if peerID != "" {
|
if peerID != "" {
|
||||||
p, err := peer.Decode(peerID)
|
p, err := peer.Decode(peerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
fOptions = append(fOptions, filter.Peer(p))
|
fOptions = append(fOptions, filter.WithPeer(p))
|
||||||
} else {
|
} else {
|
||||||
fOptions = append(fOptions, filter.UnsubscribeAll())
|
fOptions = append(fOptions, filter.UnsubscribeAll())
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,10 @@ func (cf ContentFilter) ContentTopicsList() []string {
|
||||||
return maps.Keys(cf.ContentTopics)
|
return maps.Keys(cf.ContentTopics)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
|
||||||
|
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
|
||||||
|
}
|
||||||
|
|
||||||
type WakuFilterPushResult struct {
|
type WakuFilterPushResult struct {
|
||||||
Err error
|
Err error
|
||||||
PeerID peer.ID
|
PeerID peer.ID
|
||||||
|
@ -149,7 +153,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
|
||||||
} else {
|
} else {
|
||||||
pubSubTopic = *messagePush.PubsubTopic
|
pubSubTopic = *messagePush.PubsubTopic
|
||||||
}
|
}
|
||||||
if !wf.subscriptions.Has(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage.ContentTopic) {
|
if !wf.subscriptions.Has(s.Conn().RemotePeer(), NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
|
||||||
logger.Warn("received messagepush with invalid subscription parameters",
|
logger.Warn("received messagepush with invalid subscription parameters",
|
||||||
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic),
|
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic),
|
||||||
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
|
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
|
||||||
|
@ -304,21 +308,14 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
|
||||||
var cFilter ContentFilter
|
var cFilter ContentFilter
|
||||||
cFilter.PubsubTopic = pubSubTopic
|
cFilter.PubsubTopic = pubSubTopic
|
||||||
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
|
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
|
||||||
existingSub := wf.subscriptions.Get(params.selectedPeer, contentFilter)
|
|
||||||
if existingSub != nil {
|
|
||||||
subscriptions = append(subscriptions, existingSub)
|
|
||||||
} else {
|
|
||||||
//TO OPTIMIZE: Should we parallelize these, if so till how many batches?
|
|
||||||
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
|
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wf.log.Error("Failed to subscribe for conentTopics ",
|
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
|
||||||
zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
|
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
failedContentTopics = append(failedContentTopics, cTopics...)
|
failedContentTopics = append(failedContentTopics, cTopics...)
|
||||||
}
|
}
|
||||||
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter))
|
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if len(failedContentTopics) > 0 {
|
if len(failedContentTopics) > 0 {
|
||||||
return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
|
return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
|
||||||
|
@ -335,15 +332,15 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !wf.subscriptions.Has(peerID, contentFilter.PubsubTopic, contentFilter.ContentTopicsList()...) {
|
if !wf.subscriptions.Has(peerID, contentFilter) {
|
||||||
return nil, errors.New("subscription does not exist")
|
return nil, errors.New("subscription does not exist")
|
||||||
}
|
}
|
||||||
|
|
||||||
return wf.subscriptions.NewSubscription(peerID, contentFilter), nil
|
return wf.subscriptions.NewSubscription(peerID, contentFilter), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
|
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeOption) (*FilterSubscribeParameters, error) {
|
||||||
params := new(FilterUnsubscribeParameters)
|
params := new(FilterSubscribeParameters)
|
||||||
params.log = wf.log
|
params.log = wf.log
|
||||||
opts = append(DefaultUnsubscribeOptions(), opts...)
|
opts = append(DefaultUnsubscribeOptions(), opts...)
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
|
@ -418,21 +415,18 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte
|
||||||
subscriptionDetail.Remove(contentFilter.ContentTopicsList()...)
|
subscriptionDetail.Remove(contentFilter.ContentTopicsList()...)
|
||||||
if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 {
|
if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 {
|
||||||
delete(subscriptionDetailList, subscriptionDetailID)
|
delete(subscriptionDetailList, subscriptionDetailID)
|
||||||
} else {
|
subscriptionDetail.closeC()
|
||||||
subscriptionDetailList[subscriptionDetailID] = subscriptionDetail
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(subscriptionDetailList) == 0 {
|
if len(subscriptionDetailList) == 0 {
|
||||||
delete(wf.subscriptions.items[peerID].subsPerPubsubTopic, contentFilter.PubsubTopic)
|
delete(wf.subscriptions.items[peerID].subsPerPubsubTopic, contentFilter.PubsubTopic)
|
||||||
} else {
|
|
||||||
wf.subscriptions.items[peerID].subsPerPubsubTopic[contentFilter.PubsubTopic] = subscriptionDetailList
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
|
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
|
||||||
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
|
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
|
||||||
wf.RLock()
|
wf.RLock()
|
||||||
defer wf.RUnlock()
|
defer wf.RUnlock()
|
||||||
if err := wf.ErrOnNotRunning(); err != nil {
|
if err := wf.ErrOnNotRunning(); err != nil {
|
||||||
|
@ -456,12 +450,9 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
|
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
|
||||||
for pTopic, cTopics := range pubSubTopicMap {
|
for pTopic, cTopics := range pubSubTopicMap {
|
||||||
var cFilter ContentFilter
|
cFilter := NewContentFilter(pTopic, cTopics...)
|
||||||
cFilter.PubsubTopic = pTopic
|
|
||||||
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
|
|
||||||
for peerID := range wf.subscriptions.items {
|
for peerID := range wf.subscriptions.items {
|
||||||
if params.selectedPeer != "" && peerID != params.selectedPeer {
|
if params.selectedPeer != "" && peerID != params.selectedPeer {
|
||||||
continue
|
continue
|
||||||
|
@ -487,21 +478,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
|
||||||
params.wg.Done()
|
params.wg.Done()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
err := wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, cFilter)
|
||||||
err := wf.request(
|
|
||||||
ctx,
|
|
||||||
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
|
|
||||||
pb.FilterSubscribeRequest_UNSUBSCRIBE,
|
|
||||||
cFilter)
|
|
||||||
if err != nil {
|
|
||||||
ferr, ok := err.(*FilterError)
|
|
||||||
if ok && ferr.Code == http.StatusNotFound {
|
|
||||||
wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peerID), zap.Error(err))
|
|
||||||
} else {
|
|
||||||
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if params.wg != nil {
|
if params.wg != nil {
|
||||||
resultChan <- WakuFilterPushResult{
|
resultChan <- WakuFilterPushResult{
|
||||||
|
@ -521,20 +498,54 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
|
||||||
return resultChan, nil
|
return resultChan, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
|
// UnsubscribeWithSubscription is used to close a particular subscription
|
||||||
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
|
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
|
||||||
|
// server unsubscribe is also performed
|
||||||
|
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
|
||||||
wf.RLock()
|
wf.RLock()
|
||||||
defer wf.RUnlock()
|
defer wf.RUnlock()
|
||||||
if err := wf.ErrOnNotRunning(); err != nil {
|
if err := wf.ErrOnNotRunning(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
opts = append(opts, Peer(sub.PeerID))
|
params, err := wf.getUnsubscribeParameters(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close this sub
|
||||||
|
sub.Close()
|
||||||
|
|
||||||
|
resultChan := make(chan WakuFilterPushResult, 1)
|
||||||
|
|
||||||
|
if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) {
|
||||||
|
// Last sub for this [peer, contentFilter] pair
|
||||||
|
err = wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: sub.PeerID, requestID: params.requestID}, sub.ContentFilter)
|
||||||
|
resultChan <- WakuFilterPushResult{
|
||||||
|
Err: err,
|
||||||
|
PeerID: sub.PeerID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(resultChan)
|
||||||
|
return resultChan, err
|
||||||
|
|
||||||
return wf.Unsubscribe(ctx, sub.ContentFilter, opts...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
|
func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter ContentFilter) error {
|
||||||
|
err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter)
|
||||||
|
if err != nil {
|
||||||
|
ferr, ok := err.(*FilterError)
|
||||||
|
if ok && ferr.Code == http.StatusNotFound {
|
||||||
|
wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", params.selectedPeer), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", params.selectedPeer), zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
|
||||||
params, err := wf.getUnsubscribeParameters(opts...)
|
params, err := wf.getUnsubscribeParameters(opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -590,7 +601,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
|
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
|
||||||
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
|
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
|
||||||
wf.RLock()
|
wf.RLock()
|
||||||
defer wf.RUnlock()
|
defer wf.RUnlock()
|
||||||
if err := wf.ErrOnNotRunning(); err != nil {
|
if err := wf.ErrOnNotRunning(); err != nil {
|
||||||
|
|
|
@ -20,7 +20,6 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/maps"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFilterSuite(t *testing.T) {
|
func TestFilterSuite(t *testing.T) {
|
||||||
|
@ -110,7 +109,7 @@ func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
select {
|
select {
|
||||||
case env := <-ch:
|
case env := <-ch:
|
||||||
s.Require().Equal(maps.Keys(s.contentFilter.ContentTopics)[0], env.Message().GetContentTopic())
|
s.Require().Equal(s.contentFilter.ContentTopicsList()[0], env.Message().GetContentTopic())
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
s.Require().Fail("Message timeout")
|
s.Require().Fail("Message timeout")
|
||||||
case <-s.ctx.Done():
|
case <-s.ctx.Done():
|
||||||
|
@ -118,7 +117,9 @@ func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if fn != nil {
|
||||||
fn()
|
fn()
|
||||||
|
}
|
||||||
|
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
}
|
}
|
||||||
|
@ -128,12 +129,14 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope)
|
||||||
go func() {
|
go func() {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
select {
|
select {
|
||||||
case <-ch:
|
case env, ok := <-ch:
|
||||||
s.Require().Fail("should not receive another message")
|
if ok {
|
||||||
|
s.Require().Fail("should not receive another message", zap.String("payload", string(env.Message().Payload)))
|
||||||
|
}
|
||||||
case <-time.After(1 * time.Second):
|
case <-time.After(1 * time.Second):
|
||||||
// Timeout elapsed, all good
|
// Timeout elapsed, all good
|
||||||
case <-s.ctx.Done():
|
case <-s.ctx.Done():
|
||||||
s.Require().Fail("test exceeded allocated time")
|
s.Require().Fail("waitForTimeout test exceeded allocated time")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -216,15 +219,52 @@ func (s *FilterTestSuite) TestWakuFilter() {
|
||||||
s.publishMsg(s.testTopic, "TopicB", "second")
|
s.publishMsg(s.testTopic, "TopicB", "second")
|
||||||
}, s.subDetails[0].C)
|
}, s.subDetails[0].C)
|
||||||
|
|
||||||
_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID()))
|
_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
// Should not receive after unsubscribe
|
// Should not receive after unsubscribe
|
||||||
s.waitForTimeout(func() {
|
s.waitForTimeout(func() {
|
||||||
s.publishMsg(s.testTopic, s.testContentTopic, "third")
|
s.publishMsg(s.testTopic, s.testContentTopic, "third")
|
||||||
}, s.subDetails[0].C)
|
}, s.subDetails[0].C)
|
||||||
|
|
||||||
|
// Two new subscriptions with same [peer, contentFilter]
|
||||||
|
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
|
||||||
|
secondSub := s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
|
||||||
|
|
||||||
|
// Assert that we have 2 subscriptions now
|
||||||
|
s.Require().Equal(len(s.lightNode.Subscriptions()), 2)
|
||||||
|
|
||||||
|
// Should be received on both subscriptions
|
||||||
|
s.waitForMsg(func() {
|
||||||
|
s.publishMsg(s.testTopic, s.testContentTopic, "fourth")
|
||||||
|
}, s.subDetails[0].C)
|
||||||
|
|
||||||
|
s.waitForMsg(func() {
|
||||||
|
s.publishMsg(s.testTopic, s.testContentTopic, "fifth")
|
||||||
|
}, secondSub[0].C)
|
||||||
|
|
||||||
|
s.waitForMsg(nil, s.subDetails[0].C)
|
||||||
|
s.waitForMsg(nil, secondSub[0].C)
|
||||||
|
|
||||||
|
// Unsubscribe from second sub only
|
||||||
|
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0])
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
// Should still receive
|
||||||
|
s.waitForMsg(func() {
|
||||||
|
s.publishMsg(s.testTopic, s.testContentTopic, "sixth")
|
||||||
|
}, s.subDetails[0].C)
|
||||||
|
|
||||||
|
// Unsubscribe from first sub only
|
||||||
|
_, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0])
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
s.Require().Equal(len(s.lightNode.Subscriptions()), 0)
|
||||||
|
|
||||||
|
// Should not receive after unsubscribe
|
||||||
|
s.waitForTimeout(func() {
|
||||||
|
s.publishMsg(s.testTopic, s.testContentTopic, "seventh")
|
||||||
|
}, s.subDetails[0].C)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *FilterTestSuite) TestSubscriptionPing() {
|
func (s *FilterTestSuite) TestSubscriptionPing() {
|
||||||
|
@ -441,7 +481,7 @@ func (s *FilterTestSuite) TestAutoShard() {
|
||||||
s.publishMsg(s.testTopic, "TopicB", "second")
|
s.publishMsg(s.testTopic, "TopicB", "second")
|
||||||
}, s.subDetails[0].C)
|
}, s.subDetails[0].C)
|
||||||
|
|
||||||
_, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID()))
|
_, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
|
@ -15,18 +15,16 @@ import (
|
||||||
|
|
||||||
type (
|
type (
|
||||||
FilterSubscribeParameters struct {
|
FilterSubscribeParameters struct {
|
||||||
host host.Host
|
|
||||||
selectedPeer peer.ID
|
selectedPeer peer.ID
|
||||||
pm *peermanager.PeerManager
|
|
||||||
requestID []byte
|
requestID []byte
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
}
|
|
||||||
|
|
||||||
FilterUnsubscribeParameters struct {
|
// Subscribe-specific
|
||||||
|
host host.Host
|
||||||
|
pm *peermanager.PeerManager
|
||||||
|
|
||||||
|
// Unsubscribe-specific
|
||||||
unsubscribeAll bool
|
unsubscribeAll bool
|
||||||
selectedPeer peer.ID
|
|
||||||
requestID []byte
|
|
||||||
log *zap.Logger
|
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,7 +36,6 @@ type (
|
||||||
Option func(*FilterParameters)
|
Option func(*FilterParameters)
|
||||||
|
|
||||||
FilterSubscribeOption func(*FilterSubscribeParameters)
|
FilterSubscribeOption func(*FilterSubscribeParameters)
|
||||||
FilterUnsubscribeOption func(*FilterUnsubscribeParameters)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func WithTimeout(timeout time.Duration) Option {
|
func WithTimeout(timeout time.Duration) Option {
|
||||||
|
@ -89,7 +86,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Fi
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithRequestID is an option to set a specific request ID to be used when
|
// WithRequestID is an option to set a specific request ID to be used when
|
||||||
// creating a filter subscription
|
// creating/removing a filter subscription
|
||||||
func WithRequestID(requestID []byte) FilterSubscribeOption {
|
func WithRequestID(requestID []byte) FilterSubscribeOption {
|
||||||
return func(params *FilterSubscribeParameters) {
|
return func(params *FilterSubscribeParameters) {
|
||||||
params.requestID = requestID
|
params.requestID = requestID
|
||||||
|
@ -111,51 +108,31 @@ func DefaultSubscriptionOptions() []FilterSubscribeOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func UnsubscribeAll() FilterUnsubscribeOption {
|
func UnsubscribeAll() FilterSubscribeOption {
|
||||||
return func(params *FilterUnsubscribeParameters) {
|
return func(params *FilterSubscribeParameters) {
|
||||||
params.unsubscribeAll = true
|
params.unsubscribeAll = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Peer(p peer.ID) FilterUnsubscribeOption {
|
// WithWaitGroup allows specifying a waitgroup to wait until all
|
||||||
return func(params *FilterUnsubscribeParameters) {
|
|
||||||
params.selectedPeer = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestID is an option to set a specific request ID to be used when
|
|
||||||
// removing a subscription from a filter node
|
|
||||||
func RequestID(requestID []byte) FilterUnsubscribeOption {
|
|
||||||
return func(params *FilterUnsubscribeParameters) {
|
|
||||||
params.requestID = requestID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func AutomaticRequestID() FilterUnsubscribeOption {
|
|
||||||
return func(params *FilterUnsubscribeParameters) {
|
|
||||||
params.requestID = protocol.GenerateRequestID()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithWaitGroup allos specigying a waitgroup to wait until all
|
|
||||||
// unsubscribe requests are complete before the function is complete
|
// unsubscribe requests are complete before the function is complete
|
||||||
func WithWaitGroup(wg *sync.WaitGroup) FilterUnsubscribeOption {
|
func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption {
|
||||||
return func(params *FilterUnsubscribeParameters) {
|
return func(params *FilterSubscribeParameters) {
|
||||||
params.wg = wg
|
params.wg = wg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DontWait is used to fire and forget an unsubscription, and don't
|
// DontWait is used to fire and forget an unsubscription, and don't
|
||||||
// care about the results of it
|
// care about the results of it
|
||||||
func DontWait() FilterUnsubscribeOption {
|
func DontWait() FilterSubscribeOption {
|
||||||
return func(params *FilterUnsubscribeParameters) {
|
return func(params *FilterSubscribeParameters) {
|
||||||
params.wg = nil
|
params.wg = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultUnsubscribeOptions() []FilterUnsubscribeOption {
|
func DefaultUnsubscribeOptions() []FilterSubscribeOption {
|
||||||
return []FilterUnsubscribeOption{
|
return []FilterSubscribeOption{
|
||||||
AutomaticRequestID(),
|
WithAutomaticRequestID(),
|
||||||
WithWaitGroup(&sync.WaitGroup{}),
|
WithWaitGroup(&sync.WaitGroup{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ func TestFilterOption(t *testing.T) {
|
||||||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// subscribe options
|
||||||
options := []FilterSubscribeOption{
|
options := []FilterSubscribeOption{
|
||||||
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
||||||
WithAutomaticPeerSelection(),
|
WithAutomaticPeerSelection(),
|
||||||
|
@ -34,13 +35,14 @@ func TestFilterOption(t *testing.T) {
|
||||||
require.Equal(t, host, params.host)
|
require.Equal(t, host, params.host)
|
||||||
require.NotNil(t, params.selectedPeer)
|
require.NotNil(t, params.selectedPeer)
|
||||||
|
|
||||||
options2 := []FilterUnsubscribeOption{
|
// Unsubscribe options
|
||||||
AutomaticRequestID(),
|
options2 := []FilterSubscribeOption{
|
||||||
|
WithAutomaticRequestID(),
|
||||||
UnsubscribeAll(),
|
UnsubscribeAll(),
|
||||||
Peer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
||||||
}
|
}
|
||||||
|
|
||||||
params2 := new(FilterUnsubscribeParameters)
|
params2 := new(FilterSubscribeParameters)
|
||||||
|
|
||||||
for _, opt := range options2 {
|
for _, opt := range options2 {
|
||||||
opt(params2)
|
opt(params2)
|
||||||
|
|
|
@ -84,34 +84,8 @@ func (sub *SubscriptionsMap) IsSubscribedTo(peerID peer.ID) bool {
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sub *SubscriptionsMap) Get(peerID peer.ID, cf ContentFilter) *SubscriptionDetails {
|
|
||||||
sub.RLock()
|
|
||||||
defer sub.RUnlock()
|
|
||||||
|
|
||||||
// Check if peer exits
|
|
||||||
peerSubscription, ok := sub.items[peerID]
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if pubsub topic exists
|
|
||||||
subscriptions, ok := peerSubscription.subsPerPubsubTopic[cf.PubsubTopic]
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the content topic exists within the list of subscriptions for this peer
|
|
||||||
for _, subscription := range subscriptions {
|
|
||||||
if maps.Equal(subscription.ContentFilter.ContentTopics, cf.ContentTopics) {
|
|
||||||
return subscription
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we have subscriptions for all (pubsubTopic, contentTopics[i]) pairs provided
|
// Check if we have subscriptions for all (pubsubTopic, contentTopics[i]) pairs provided
|
||||||
func (sub *SubscriptionsMap) Has(peerID peer.ID, pubsubTopic string, contentTopics ...string) bool {
|
func (sub *SubscriptionsMap) Has(peerID peer.ID, cf ContentFilter) bool {
|
||||||
sub.RLock()
|
sub.RLock()
|
||||||
defer sub.RUnlock()
|
defer sub.RUnlock()
|
||||||
|
|
||||||
|
@ -122,13 +96,13 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, pubsubTopic string, contentTopi
|
||||||
}
|
}
|
||||||
//TODO: Handle pubsubTopic as null
|
//TODO: Handle pubsubTopic as null
|
||||||
// Check if pubsub topic exists
|
// Check if pubsub topic exists
|
||||||
subscriptions, ok := peerSubscription.subsPerPubsubTopic[pubsubTopic]
|
subscriptions, ok := peerSubscription.subsPerPubsubTopic[cf.PubsubTopic]
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the content topic exists within the list of subscriptions for this peer
|
// Check if the content topic exists within the list of subscriptions for this peer
|
||||||
for _, ct := range contentTopics {
|
for _, ct := range cf.ContentTopicsList() {
|
||||||
found := false
|
found := false
|
||||||
for _, subscription := range subscriptions {
|
for _, subscription := range subscriptions {
|
||||||
_, exists := subscription.ContentFilter.ContentTopics[ct]
|
_, exists := subscription.ContentFilter.ContentTopics[ct]
|
||||||
|
|
Loading…
Reference in New Issue