From 1deafd1e476c9c4570eadac4a9f99c5ad4aed582 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Sat, 24 May 2025 11:46:58 +0100 Subject: [PATCH] fix: remove automatic relay unsubscribe in a goroutine --- waku/v2/protocol/relay/waku_relay.go | 14 ++++++-------- waku/v2/protocol/relay/waku_relay_test.go | 16 ++++++++-------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 24964dd1..66c5de4d 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -12,9 +12,10 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - proto "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/proto" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" @@ -388,7 +389,7 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool { } // subscribe returns list of Subscription to receive messages based on content filter -func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { +func (w *WakuRelay) subscribe(contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { var subscriptions []*Subscription pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter) @@ -438,11 +439,6 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont w.topicsMutex.Unlock() subscriptions = append(subscriptions, subscription) - go func() { - defer utils.LogOnPanic() - <-ctx.Done() - subscription.Unsubscribe() - }() } return subscriptions, nil @@ -450,11 +446,13 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont // Subscribe returns a Subscription to receive messages as per contentFilter // contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding) +// ctx argument is ignored and left for compatibility. func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { - return w.subscribe(ctx, contentFilter, opts...) + return w.subscribe(contentFilter, opts...) } // Unsubscribe closes a subscription to a pubsub topic +// ctx argument is ignored and left for compatibility. func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error { pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter) diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 1e2f3781..27d1e016 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -41,7 +41,7 @@ func TestWakuRelay(t *testing.T) { require.NoError(t, err) defer relay.Stop() - subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) + subs, err := relay.subscribe(protocol.NewContentFilter(testTopic)) require.NoError(t, err) @@ -92,7 +92,7 @@ func TestWakuRelayUnsubscribedTopic(t *testing.T) { require.NoError(t, err) defer relay.Stop() - subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) + subs, err := relay.subscribe(protocol.NewContentFilter(testTopic)) require.NoError(t, err) @@ -278,7 +278,7 @@ func TestWakuRelayAutoShard(t *testing.T) { defer bcaster.Stop() //Create a contentTopic level subscription - subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic)) + subs, err := relay.subscribe(protocol.NewContentFilter("", testcTopic)) require.NoError(t, err) require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true) @@ -299,7 +299,7 @@ func TestWakuRelayAutoShard(t *testing.T) { defer cancel() //Create a pubSub level subscription - subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic)) + subs1, err := relay.subscribe(protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic)) require.NoError(t, err) msg := &pb.WakuMessage{ @@ -382,7 +382,7 @@ func TestInvalidMessagePublish(t *testing.T) { ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second) - subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) + subs, err := relay.subscribe(protocol.NewContentFilter(testTopic)) require.NoError(t, err) // Test empty contentTopic @@ -403,7 +403,7 @@ func TestInvalidMessagePublish(t *testing.T) { _, err = relay.Publish(ctx, message, WithPubSubTopic(testTopic)) require.Error(t, err) - err = relay.Unsubscribe(ctx, protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic)) + err = relay.Unsubscribe(protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic)) require.NoError(t, err) ctxCancel() @@ -459,10 +459,10 @@ func TestWakuRelayStaticSharding(t *testing.T) { time.Sleep(2 * time.Second) // Subscribe to valid static shard topic on both hosts - subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) + subs1, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic)) require.NoError(t, err) - subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) + subs2, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic)) require.NoError(t, err) require.True(t, relay2.IsSubscribed(testTopic)) require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0])