fix: remove automatic relay unsubscribe in a goroutine

This commit is contained in:
Igor Sirotin 2025-05-24 11:46:58 +01:00
parent 900b98812a
commit 1deafd1e47
2 changed files with 14 additions and 16 deletions

View File

@ -12,9 +12,10 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
proto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/proto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
@ -388,7 +389,7 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
}
// subscribe returns list of Subscription to receive messages based on content filter
func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
func (w *WakuRelay) subscribe(contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
var subscriptions []*Subscription
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
@ -438,11 +439,6 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
w.topicsMutex.Unlock()
subscriptions = append(subscriptions, subscription)
go func() {
defer utils.LogOnPanic()
<-ctx.Done()
subscription.Unsubscribe()
}()
}
return subscriptions, nil
@ -450,11 +446,13 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
// Subscribe returns a Subscription to receive messages as per contentFilter
// contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding)
// ctx argument is ignored and left for compatibility.
func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
return w.subscribe(ctx, contentFilter, opts...)
return w.subscribe(contentFilter, opts...)
}
// Unsubscribe closes a subscription to a pubsub topic
// ctx argument is ignored and left for compatibility.
func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error {
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)

View File

@ -41,7 +41,7 @@ func TestWakuRelay(t *testing.T) {
require.NoError(t, err)
defer relay.Stop()
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
require.NoError(t, err)
@ -92,7 +92,7 @@ func TestWakuRelayUnsubscribedTopic(t *testing.T) {
require.NoError(t, err)
defer relay.Stop()
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
require.NoError(t, err)
@ -278,7 +278,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
defer bcaster.Stop()
//Create a contentTopic level subscription
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic))
subs, err := relay.subscribe(protocol.NewContentFilter("", testcTopic))
require.NoError(t, err)
require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true)
@ -299,7 +299,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
defer cancel()
//Create a pubSub level subscription
subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
subs1, err := relay.subscribe(protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
msg := &pb.WakuMessage{
@ -382,7 +382,7 @@ func TestInvalidMessagePublish(t *testing.T) {
ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second)
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
require.NoError(t, err)
// Test empty contentTopic
@ -403,7 +403,7 @@ func TestInvalidMessagePublish(t *testing.T) {
_, err = relay.Publish(ctx, message, WithPubSubTopic(testTopic))
require.Error(t, err)
err = relay.Unsubscribe(ctx, protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
err = relay.Unsubscribe(protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
ctxCancel()
@ -459,10 +459,10 @@ func TestWakuRelayStaticSharding(t *testing.T) {
time.Sleep(2 * time.Second)
// Subscribe to valid static shard topic on both hosts
subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
subs1, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
require.NoError(t, err)
subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
subs2, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
require.NoError(t, err)
require.True(t, relay2.IsSubscribed(testTopic))
require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0])