diff --git a/waku/v2/protocol/relay/subscription.go b/waku/v2/protocol/relay/subscription.go index fc1df406..d60e1cf6 100644 --- a/waku/v2/protocol/relay/subscription.go +++ b/waku/v2/protocol/relay/subscription.go @@ -3,8 +3,9 @@ package relay import ( "context" - "github.com/waku-org/go-waku/waku/v2/protocol" "golang.org/x/exp/slices" + + "github.com/waku-org/go-waku/waku/v2/protocol" ) // Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic. @@ -55,3 +56,7 @@ func NewSubscription(contentFilter protocol.ContentFilter) *Subscription { subType: subType, } } + +func (s *Subscription) ContentFilter() protocol.ContentFilter { + return s.contentFilter +} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 24964dd1..66c5de4d 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -12,9 +12,10 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - proto "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/proto" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" @@ -388,7 +389,7 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool { } // subscribe returns list of Subscription to receive messages based on content filter -func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { +func (w *WakuRelay) subscribe(contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { var subscriptions []*Subscription pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter) @@ -438,11 +439,6 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont w.topicsMutex.Unlock() subscriptions = append(subscriptions, subscription) - go func() { - defer utils.LogOnPanic() - <-ctx.Done() - subscription.Unsubscribe() - }() } return subscriptions, nil @@ -450,11 +446,13 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont // Subscribe returns a Subscription to receive messages as per contentFilter // contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding) +// ctx argument is ignored and left for compatibility. func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { - return w.subscribe(ctx, contentFilter, opts...) + return w.subscribe(contentFilter, opts...) } // Unsubscribe closes a subscription to a pubsub topic +// ctx argument is ignored and left for compatibility. func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error { pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter) diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 1e2f3781..ff9b94ee 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -41,7 +42,7 @@ func TestWakuRelay(t *testing.T) { require.NoError(t, err) defer relay.Stop() - subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) + subs, err := relay.subscribe(protocol.NewContentFilter(testTopic)) require.NoError(t, err) @@ -92,7 +93,7 @@ func TestWakuRelayUnsubscribedTopic(t *testing.T) { require.NoError(t, err) defer relay.Stop() - subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) + subs, err := relay.subscribe(protocol.NewContentFilter(testTopic)) require.NoError(t, err) @@ -278,7 +279,7 @@ func TestWakuRelayAutoShard(t *testing.T) { defer bcaster.Stop() //Create a contentTopic level subscription - subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic)) + subs, err := relay.subscribe(protocol.NewContentFilter("", testcTopic)) require.NoError(t, err) require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true) @@ -299,7 +300,7 @@ func TestWakuRelayAutoShard(t *testing.T) { defer cancel() //Create a pubSub level subscription - subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic)) + subs1, err := relay.subscribe(protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic)) require.NoError(t, err) msg := &pb.WakuMessage{ @@ -382,7 +383,7 @@ func TestInvalidMessagePublish(t *testing.T) { ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second) - subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) + subs, err := relay.subscribe(protocol.NewContentFilter(testTopic)) require.NoError(t, err) // Test empty contentTopic @@ -459,10 +460,10 @@ func TestWakuRelayStaticSharding(t *testing.T) { time.Sleep(2 * time.Second) // Subscribe to valid static shard topic on both hosts - subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) + subs1, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic)) require.NoError(t, err) - subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) + subs2, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic)) require.NoError(t, err) require.True(t, relay2.IsSubscribed(testTopic)) require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0])