2023-09-29 10:43:25 +05:30
|
|
|
package subscription
|
2023-02-14 18:19:38 -04:00
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"sync"
|
|
|
|
|
"testing"
|
|
|
|
|
"time"
|
|
|
|
|
|
2023-09-29 10:43:25 +05:30
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
|
|
|
"github.com/libp2p/go-libp2p/core/test"
|
|
|
|
|
"github.com/stretchr/testify/assert"
|
2023-02-14 18:19:38 -04:00
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
|
"github.com/waku-org/go-waku/tests"
|
|
|
|
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
2023-05-08 17:33:10 -04:00
|
|
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
2023-11-07 15:48:43 -04:00
|
|
|
"google.golang.org/protobuf/proto"
|
2023-02-14 18:19:38 -04:00
|
|
|
)
|
|
|
|
|
|
2023-09-29 10:43:25 +05:30
|
|
|
const PUBSUB_TOPIC = "/test/topic"
|
|
|
|
|
|
|
|
|
|
func createPeerID(t *testing.T) peer.ID {
|
|
|
|
|
peerId, err := test.RandPeerID()
|
|
|
|
|
assert.NoError(t, err)
|
|
|
|
|
return peerId
|
|
|
|
|
}
|
|
|
|
|
|
2023-02-14 18:19:38 -04:00
|
|
|
func TestSubscriptionMapAppend(t *testing.T) {
|
2023-05-08 17:33:10 -04:00
|
|
|
fmap := NewSubscriptionMap(utils.Logger())
|
2023-09-11 10:24:05 -04:00
|
|
|
peerID := createPeerID(t)
|
2023-09-29 10:43:25 +05:30
|
|
|
contentTopics := protocol.NewContentTopicSet("ct1", "ct2")
|
2023-02-14 18:19:38 -04:00
|
|
|
|
2023-09-29 10:43:25 +05:30
|
|
|
sub := fmap.NewSubscription(peerID, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC, ContentTopics: contentTopics})
|
2023-09-19 15:52:11 +03:00
|
|
|
_, found := sub.ContentFilter.ContentTopics["ct1"]
|
2023-02-14 18:19:38 -04:00
|
|
|
require.True(t, found)
|
2023-09-19 15:52:11 +03:00
|
|
|
_, found = sub.ContentFilter.ContentTopics["ct2"]
|
2023-02-14 18:19:38 -04:00
|
|
|
require.True(t, found)
|
2023-05-08 17:33:10 -04:00
|
|
|
require.False(t, sub.Closed)
|
2023-09-11 10:24:05 -04:00
|
|
|
require.Equal(t, sub.PeerID, peerID)
|
2023-09-19 15:52:11 +03:00
|
|
|
require.Equal(t, sub.ContentFilter.PubsubTopic, PUBSUB_TOPIC)
|
2023-02-14 18:19:38 -04:00
|
|
|
|
|
|
|
|
sub.Add("ct3")
|
2023-09-19 15:52:11 +03:00
|
|
|
_, found = sub.ContentFilter.ContentTopics["ct3"]
|
2023-02-14 18:19:38 -04:00
|
|
|
require.True(t, found)
|
|
|
|
|
|
|
|
|
|
sub.Remove("ct3")
|
2023-09-19 15:52:11 +03:00
|
|
|
_, found = sub.ContentFilter.ContentTopics["ct3"]
|
2023-02-14 18:19:38 -04:00
|
|
|
require.False(t, found)
|
|
|
|
|
|
|
|
|
|
err := sub.Close()
|
|
|
|
|
require.NoError(t, err)
|
2023-05-08 17:33:10 -04:00
|
|
|
require.True(t, sub.Closed)
|
2023-02-14 18:19:38 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscriptionClear(t *testing.T) {
|
2023-05-08 17:33:10 -04:00
|
|
|
fmap := NewSubscriptionMap(utils.Logger())
|
2023-09-29 10:43:25 +05:30
|
|
|
contentTopics := protocol.NewContentTopicSet("ct1", "ct2")
|
2023-02-14 18:19:38 -04:00
|
|
|
|
|
|
|
|
var subscriptions = []*SubscriptionDetails{
|
2023-09-29 10:43:25 +05:30
|
|
|
fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: contentTopics}),
|
|
|
|
|
fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "2", ContentTopics: contentTopics}),
|
|
|
|
|
fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "3", ContentTopics: contentTopics}),
|
2023-02-14 18:19:38 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
|
wg.Add(len(subscriptions))
|
|
|
|
|
for _, s := range subscriptions {
|
|
|
|
|
go func(s *SubscriptionDetails) {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
t.Fail()
|
|
|
|
|
return
|
|
|
|
|
case <-s.C:
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}(s)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fmap.Clear()
|
|
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
2023-05-08 17:33:10 -04:00
|
|
|
require.True(t, subscriptions[0].Closed)
|
|
|
|
|
require.True(t, subscriptions[1].Closed)
|
|
|
|
|
require.True(t, subscriptions[2].Closed)
|
2023-02-14 18:19:38 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscriptionsNotify(t *testing.T) {
|
2023-05-08 17:33:10 -04:00
|
|
|
fmap := NewSubscriptionMap(utils.Logger())
|
2023-09-11 10:24:05 -04:00
|
|
|
p1 := createPeerID(t)
|
|
|
|
|
p2 := createPeerID(t)
|
2023-02-14 18:19:38 -04:00
|
|
|
var subscriptions = []*SubscriptionDetails{
|
2023-09-29 10:43:25 +05:30
|
|
|
fmap.NewSubscription(p1, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: protocol.NewContentTopicSet("ct1", "ct2")}),
|
|
|
|
|
fmap.NewSubscription(p2, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: protocol.NewContentTopicSet("ct1")}),
|
|
|
|
|
fmap.NewSubscription(p1, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "2", ContentTopics: protocol.NewContentTopicSet("ct1", "ct2")}),
|
2023-02-14 18:19:38 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
|
|
successChan := make(chan struct{}, 10)
|
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
|
|
|
|
|
|
successOnReceive := func(ctx context.Context, i int) {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
2023-05-08 17:33:10 -04:00
|
|
|
if subscriptions[i].Closed {
|
2023-02-14 18:19:38 -04:00
|
|
|
successChan <- struct{}{}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
panic("should have failed1")
|
|
|
|
|
case c := <-subscriptions[i].C:
|
|
|
|
|
if c == nil {
|
|
|
|
|
panic("should have failed2")
|
|
|
|
|
}
|
|
|
|
|
successChan <- struct{}{}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
failOnReceive := func(ctx context.Context, i int) {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
2023-05-08 17:33:10 -04:00
|
|
|
if subscriptions[i].Closed {
|
2023-02-14 18:19:38 -04:00
|
|
|
successChan <- struct{}{}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
successChan <- struct{}{}
|
|
|
|
|
return
|
|
|
|
|
case c := <-subscriptions[i].C:
|
|
|
|
|
if c != nil {
|
|
|
|
|
panic("should have failed")
|
|
|
|
|
}
|
|
|
|
|
successChan <- struct{}{}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wg.Add(3)
|
|
|
|
|
go successOnReceive(ctx, 0)
|
|
|
|
|
go successOnReceive(ctx, 1)
|
|
|
|
|
go failOnReceive(ctx, 2)
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
2023-11-07 15:48:43 -04:00
|
|
|
envTopic1Ct1 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", nil), 0, PUBSUB_TOPIC+"1")
|
2023-02-14 18:19:38 -04:00
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
2024-07-15 19:47:27 +05:30
|
|
|
fmap.Notify(ctx, p1, envTopic1Ct1)
|
|
|
|
|
fmap.Notify(ctx, p2, envTopic1Ct1)
|
2023-02-14 18:19:38 -04:00
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
<-successChan
|
|
|
|
|
<-successChan
|
|
|
|
|
cancel()
|
|
|
|
|
wg.Wait()
|
|
|
|
|
<-successChan
|
|
|
|
|
|
|
|
|
|
//////////////////////////////////////
|
|
|
|
|
|
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
|
|
|
|
|
|
|
wg.Add(3)
|
|
|
|
|
go successOnReceive(ctx, 0)
|
|
|
|
|
go failOnReceive(ctx, 1)
|
|
|
|
|
go failOnReceive(ctx, 2)
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
2023-11-07 15:48:43 -04:00
|
|
|
envTopic1Ct2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct2", nil), 0, PUBSUB_TOPIC+"1")
|
2023-02-14 18:19:38 -04:00
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
2024-07-15 19:47:27 +05:30
|
|
|
fmap.Notify(ctx, p1, envTopic1Ct2)
|
|
|
|
|
fmap.Notify(ctx, p2, envTopic1Ct2)
|
2023-02-14 18:19:38 -04:00
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
<-successChan
|
|
|
|
|
cancel()
|
|
|
|
|
wg.Wait()
|
|
|
|
|
<-successChan
|
|
|
|
|
<-successChan
|
|
|
|
|
|
|
|
|
|
//////////////////////////////////////
|
|
|
|
|
|
|
|
|
|
// Testing after closing the subscription
|
|
|
|
|
|
|
|
|
|
subscriptions[0].Close()
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
|
|
|
|
|
|
|
wg.Add(3)
|
|
|
|
|
go failOnReceive(ctx, 0)
|
|
|
|
|
go successOnReceive(ctx, 1)
|
|
|
|
|
go failOnReceive(ctx, 2)
|
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
|
|
2023-11-07 15:48:43 -04:00
|
|
|
envTopic1Ct1_2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", proto.Int64(1)), 1, PUBSUB_TOPIC+"1")
|
2023-02-14 18:19:38 -04:00
|
|
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
2024-07-15 19:47:27 +05:30
|
|
|
fmap.Notify(ctx, p1, envTopic1Ct1_2)
|
|
|
|
|
fmap.Notify(ctx, p2, envTopic1Ct1_2)
|
2023-02-14 18:19:38 -04:00
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
<-successChan // One of these successes is for closing the subscription
|
|
|
|
|
<-successChan
|
|
|
|
|
cancel()
|
|
|
|
|
wg.Wait()
|
|
|
|
|
<-successChan
|
|
|
|
|
}
|
2026-04-21 13:10:52 +03:00
|
|
|
|
|
|
|
|
// TestSetClosingDoesNotHoldInnerLock verifies that SetClosing does not leave
|
|
|
|
|
// the SubscriptionDetails RWMutex held when the Closing channel has no ready
|
|
|
|
|
// receiver
|
|
|
|
|
func TestSetClosingDoesNotHoldInnerLock(t *testing.T) {
|
|
|
|
|
fmap := NewSubscriptionMap(utils.Logger())
|
|
|
|
|
peerID := createPeerID(t)
|
|
|
|
|
sub := fmap.NewSubscription(peerID, protocol.ContentFilter{
|
|
|
|
|
PubsubTopic: PUBSUB_TOPIC,
|
|
|
|
|
ContentTopics: protocol.NewContentTopicSet("ct1"),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// Intentionally do NOT spawn a receiver on sub.Closing — reproduces the
|
|
|
|
|
// scenario where the api/filter multiplex goroutine or its downstream
|
|
|
|
|
// apiSub.closing consumer is stalled (needing outer mapRef.Lock that
|
|
|
|
|
// another goroutine holds as outer RLock).
|
|
|
|
|
setClosingDone := make(chan struct{})
|
|
|
|
|
go func() {
|
|
|
|
|
sub.SetClosing()
|
|
|
|
|
close(setClosingDone)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Give SetClosing time to reach the blocking send (if unpatched).
|
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
// A parallel reader that exercises the real GetSubscriptionsForPeer ->
|
|
|
|
|
// isPartOf path. isPartOf takes s.RLock() on the SubscriptionDetails.
|
|
|
|
|
readerDone := make(chan []*SubscriptionDetails, 1)
|
|
|
|
|
go func() {
|
|
|
|
|
readerDone <- fmap.GetSubscriptionsForPeer(peerID, protocol.ContentFilter{})
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case subs := <-readerDone:
|
|
|
|
|
require.Len(t, subs, 1)
|
|
|
|
|
case <-time.After(time.Second):
|
|
|
|
|
t.Fatal("reader blocked: SetClosing is holding SubscriptionDetails lock while sending on unbuffered Closing channel (deadlock)")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// After the fix, SetClosing itself should also complete — either because
|
|
|
|
|
// the channel is buffered(1) and the send is instant, or because a select-
|
|
|
|
|
// default drops the send when no one is reading. Either is acceptable.
|
|
|
|
|
select {
|
|
|
|
|
case <-setClosingDone:
|
|
|
|
|
case <-time.After(time.Second):
|
|
|
|
|
t.Fatal("SetClosing never returned without a receiver — inner lock or channel send is still blocking")
|
|
|
|
|
}
|
|
|
|
|
}
|