chore: tests for subscribers and subscrtiption maps

This commit is contained in:
Richard Ramos 2023-02-14 18:19:38 -04:00 committed by RichΛrd
parent b816434843
commit 600a8f1c8d
7 changed files with 411 additions and 15 deletions

View File

@ -213,8 +213,8 @@ func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFi
return nil
}
// SubscriptionChannel is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterPush) SubscriptionChannel(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails {
// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterPush) FilterSubscription(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails {
return wf.subscriptions.NewSubscription(peerID, topic, contentTopics)
}

View File

@ -89,6 +89,12 @@ func UnsubscribeAll() FilterUnsubscribeOption {
}
}
func Peer(p peer.ID) FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.selectedPeer = p
}
}
func RequestID(requestId []byte) FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.requestId = requestId

View File

@ -0,0 +1,52 @@
package filterv2
import (
"context"
"crypto/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func TestFilterOption(t *testing.T) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
options := []FilterSubscribeOption{
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
WithAutomaticPeerSelection(),
WithFastestPeerSelection(context.Background()),
}
params := new(FilterSubscribeParameters)
params.host = host
params.log = utils.Logger()
for _, opt := range options {
opt(params)
}
require.Equal(t, host, params.host)
require.NotNil(t, params.selectedPeer)
options2 := []FilterUnsubscribeOption{
AutomaticRequestId(),
UnsubscribeAll(),
Peer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
}
params2 := new(FilterUnsubscribeParameters)
for _, opt := range options2 {
opt(params2)
}
require.NotNil(t, params2.selectedPeer)
require.True(t, params2.unsubscribeAll)
}

View File

@ -104,14 +104,19 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop
sub.removeFromInterestMap(peerID, pubsubTopic, c)
}
pubsubTopicMap[pubsubTopic] = contentTopicsMap
// No more content topics available. Removing content topic completely
if len(contentTopicsMap) == 0 {
delete(pubsubTopicMap, pubsubTopic)
}
pubsubTopicMap[pubsubTopic] = contentTopicsMap
sub.items[peerID] = pubsubTopicMap
if len(sub.items[peerID]) == 0 {
delete(sub.items, peerID)
}
return nil
}
@ -129,6 +134,7 @@ func (sub *SubscribersMap) deleteAll(peerID peer.ID) error {
}
delete(sub.items, peerID)
delete(sub.failedPeers, peerID)
return nil
}

View File

@ -0,0 +1,127 @@
package filterv2
import (
"testing"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const TOPIC = "/test/topic"
func createPeerId(t *testing.T) peer.ID {
peerId, err := test.RandPeerID()
assert.NoError(t, err)
return peerId
}
func firstSubscriber(subs *SubscribersMap, pubsubTopic string, contentTopic string) peer.ID {
for sub := range subs.Items(pubsubTopic, contentTopic) {
return sub
}
return ""
}
func TestAppend(t *testing.T) {
subs := NewSubscribersMap(5 * time.Second)
peerId := createPeerId(t)
subs.Set(peerId, TOPIC, []string{"topic1"})
sub := firstSubscriber(subs, TOPIC, "topic1")
assert.NotNil(t, sub)
// Adding to existing peer
subs.Set(peerId, TOPIC, []string{"topic2"})
sub = firstSubscriber(subs, TOPIC, "topic2")
assert.NotNil(t, sub)
subs.Set(peerId, TOPIC+"2", []string{"topic1"})
sub = firstSubscriber(subs, TOPIC+"2", "topic1")
assert.NotNil(t, sub)
sub = firstSubscriber(subs, TOPIC, "topic2")
assert.Nil(t, sub)
}
func TestRemove(t *testing.T) {
subs := NewSubscribersMap(5 * time.Second)
peerId := createPeerId(t)
subs.Set(peerId, TOPIC+"1", []string{"topic1", "topic2"})
subs.Set(peerId, TOPIC+"2", []string{"topic1"})
subs.DeleteAll(peerId)
sub := firstSubscriber(subs, TOPIC+"1", "topic1")
assert.Nil(t, sub)
sub = firstSubscriber(subs, TOPIC+"1", "topic2")
assert.Nil(t, sub)
sub = firstSubscriber(subs, TOPIC+"2", "topic1")
assert.Nil(t, sub)
assert.False(t, subs.Has(peerId))
_, found := subs.Get(peerId)
assert.False(t, found)
_, ok := subs.items[peerId]
assert.False(t, ok)
}
func TestRemovePartial(t *testing.T) {
subs := NewSubscribersMap(5 * time.Second)
peerId := createPeerId(t)
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
err := subs.Delete(peerId, TOPIC, []string{"topic1"})
require.NoError(t, err)
sub := firstSubscriber(subs, TOPIC, "topic2")
assert.NotNil(t, sub)
}
func TestRemoveBogus(t *testing.T) {
subs := NewSubscribersMap(5 * time.Second)
peerId := createPeerId(t)
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
err := subs.Delete(peerId, TOPIC, []string{"does not exist", "topic1"})
require.NoError(t, err)
sub := firstSubscriber(subs, TOPIC, "topic1")
assert.Nil(t, sub)
sub = firstSubscriber(subs, TOPIC, "does not exist")
assert.Nil(t, sub)
err = subs.Delete(peerId, "DOES_NOT_EXIST", []string{"topic1"})
require.Error(t, err)
}
func TestSuccessFailure(t *testing.T) {
subs := NewSubscribersMap(5 * time.Second)
peerId := createPeerId(t)
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
subs.FlagAsFailure(peerId)
require.True(t, subs.IsFailedPeer(peerId))
subs.FlagAsFailure(peerId)
require.False(t, subs.Has(peerId))
subs.Set(peerId, TOPIC, []string{"topic1", "topic2"})
subs.FlagAsFailure(peerId)
require.True(t, subs.IsFailedPeer(peerId))
subs.FlagAsSuccess(peerId)
require.False(t, subs.IsFailedPeer(peerId))
}

View File

@ -59,11 +59,12 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, topic string, conte
}
details := &SubscriptionDetails{
id: uuid.NewString(),
mapRef: sub,
peerID: peerID,
pubsubTopic: topic,
C: make(chan *protocol.Envelope),
id: uuid.NewString(),
mapRef: sub,
peerID: peerID,
pubsubTopic: topic,
C: make(chan *protocol.Envelope),
contentTopics: make(map[string]struct{}),
}
for _, ct := range contentTopics {
@ -89,7 +90,7 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
return nil
}
func (s *SubscriptionDetails) Add(contentTopics []string) {
func (s *SubscriptionDetails) Add(contentTopics ...string) {
s.Lock()
defer s.Unlock()
@ -98,7 +99,7 @@ func (s *SubscriptionDetails) Add(contentTopics []string) {
}
}
func (s *SubscriptionDetails) Remove(contentTopics []string) {
func (s *SubscriptionDetails) Remove(contentTopics ...string) {
s.Lock()
defer s.Unlock()
@ -149,11 +150,6 @@ func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope)
if ok {
iterateSubscriptionSet(subscriptions, envelope)
}
subscriptionsWithNoPeer, ok := sub.items[peerID].subscriptionsPerTopic[envelope.PubsubTopic()]
if ok {
iterateSubscriptionSet(subscriptionsWithNoPeer, envelope)
}
}
func iterateSubscriptionSet(subscriptions SubscriptionSet, envelope *protocol.Envelope) {

View File

@ -0,0 +1,209 @@
package filterv2
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
func TestSubscriptionMapAppend(t *testing.T) {
fmap := NewSubscriptionMap()
peerId := createPeerId(t)
contentTopics := []string{"ct1", "ct2"}
sub := fmap.NewSubscription(peerId, TOPIC, contentTopics)
_, found := sub.contentTopics["ct1"]
require.True(t, found)
_, found = sub.contentTopics["ct2"]
require.True(t, found)
require.False(t, sub.closed)
require.Equal(t, sub.peerID, peerId)
require.Equal(t, sub.pubsubTopic, TOPIC)
sub.Add("ct3")
_, found = sub.contentTopics["ct3"]
require.True(t, found)
sub.Remove("ct3")
_, found = sub.contentTopics["ct3"]
require.False(t, found)
err := sub.Close()
require.NoError(t, err)
require.True(t, sub.closed)
err = sub.Close()
require.NoError(t, err)
}
func TestSubscriptionClear(t *testing.T) {
fmap := NewSubscriptionMap()
contentTopics := []string{"ct1", "ct2"}
var subscriptions = []*SubscriptionDetails{
fmap.NewSubscription(createPeerId(t), TOPIC+"1", contentTopics),
fmap.NewSubscription(createPeerId(t), TOPIC+"2", contentTopics),
fmap.NewSubscription(createPeerId(t), TOPIC+"3", contentTopics),
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
wg := sync.WaitGroup{}
wg.Add(len(subscriptions))
for _, s := range subscriptions {
go func(s *SubscriptionDetails) {
defer wg.Done()
select {
case <-ctx.Done():
t.Fail()
return
case <-s.C:
return
}
}(s)
}
fmap.Clear()
wg.Wait()
require.True(t, subscriptions[0].closed)
require.True(t, subscriptions[1].closed)
require.True(t, subscriptions[2].closed)
}
func TestSubscriptionsNotify(t *testing.T) {
fmap := NewSubscriptionMap()
p1 := createPeerId(t)
p2 := createPeerId(t)
var subscriptions = []*SubscriptionDetails{
fmap.NewSubscription(p1, TOPIC+"1", []string{"ct1", "ct2"}),
fmap.NewSubscription(p2, TOPIC+"1", []string{"ct1"}),
fmap.NewSubscription(p1, TOPIC+"2", []string{"ct1", "ct2"}),
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
successChan := make(chan struct{}, 10)
wg := sync.WaitGroup{}
successOnReceive := func(ctx context.Context, i int) {
defer wg.Done()
if subscriptions[i].closed {
successChan <- struct{}{}
return
}
select {
case <-ctx.Done():
panic("should have failed1")
case c := <-subscriptions[i].C:
if c == nil {
panic("should have failed2")
}
successChan <- struct{}{}
return
}
}
failOnReceive := func(ctx context.Context, i int) {
defer wg.Done()
if subscriptions[i].closed {
successChan <- struct{}{}
return
}
select {
case <-ctx.Done():
successChan <- struct{}{}
return
case c := <-subscriptions[i].C:
if c != nil {
panic("should have failed")
}
successChan <- struct{}{}
return
}
}
wg.Add(3)
go successOnReceive(ctx, 0)
go successOnReceive(ctx, 1)
go failOnReceive(ctx, 2)
time.Sleep(200 * time.Millisecond)
envTopic1Ct1 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", 0), 0, TOPIC+"1")
wg.Add(1)
go func() {
defer wg.Done()
fmap.Notify(p1, envTopic1Ct1)
fmap.Notify(p2, envTopic1Ct1)
}()
<-successChan
<-successChan
cancel()
wg.Wait()
<-successChan
//////////////////////////////////////
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
wg.Add(3)
go successOnReceive(ctx, 0)
go failOnReceive(ctx, 1)
go failOnReceive(ctx, 2)
time.Sleep(200 * time.Millisecond)
envTopic1Ct2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct2", 0), 0, TOPIC+"1")
wg.Add(1)
go func() {
defer wg.Done()
fmap.Notify(p1, envTopic1Ct2)
fmap.Notify(p2, envTopic1Ct2)
}()
<-successChan
cancel()
wg.Wait()
<-successChan
<-successChan
//////////////////////////////////////
// Testing after closing the subscription
subscriptions[0].Close()
time.Sleep(200 * time.Millisecond)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
wg.Add(3)
go failOnReceive(ctx, 0)
go successOnReceive(ctx, 1)
go failOnReceive(ctx, 2)
time.Sleep(200 * time.Millisecond)
envTopic1Ct1_2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", 1), 1, TOPIC+"1")
wg.Add(1)
go func() {
defer wg.Done()
fmap.Notify(p1, envTopic1Ct1_2)
fmap.Notify(p2, envTopic1Ct1_2)
}()
<-successChan // One of these successes is for closing the subscription
<-successChan
cancel()
wg.Wait()
<-successChan
}