mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-02 14:03:06 +00:00
fix: remove automatic relay unsubscribe in a goroutine (#1284)
This commit is contained in:
parent
900b98812a
commit
5dea6d3bce
@ -3,8 +3,9 @@ package relay
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
)
|
||||
|
||||
// Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic.
|
||||
@ -55,3 +56,7 @@ func NewSubscription(contentFilter protocol.ContentFilter) *Subscription {
|
||||
subType: subType,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscription) ContentFilter() protocol.ContentFilter {
|
||||
return s.contentFilter
|
||||
}
|
||||
|
||||
@ -12,9 +12,10 @@ import (
|
||||
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
proto "google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
@ -388,7 +389,7 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
|
||||
}
|
||||
|
||||
// subscribe returns list of Subscription to receive messages based on content filter
|
||||
func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
|
||||
func (w *WakuRelay) subscribe(contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
|
||||
|
||||
var subscriptions []*Subscription
|
||||
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
|
||||
@ -438,11 +439,6 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
|
||||
w.topicsMutex.Unlock()
|
||||
|
||||
subscriptions = append(subscriptions, subscription)
|
||||
go func() {
|
||||
defer utils.LogOnPanic()
|
||||
<-ctx.Done()
|
||||
subscription.Unsubscribe()
|
||||
}()
|
||||
}
|
||||
|
||||
return subscriptions, nil
|
||||
@ -450,11 +446,13 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
|
||||
|
||||
// Subscribe returns a Subscription to receive messages as per contentFilter
|
||||
// contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding)
|
||||
// ctx argument is ignored and left for compatibility.
|
||||
func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
|
||||
return w.subscribe(ctx, contentFilter, opts...)
|
||||
return w.subscribe(contentFilter, opts...)
|
||||
}
|
||||
|
||||
// Unsubscribe closes a subscription to a pubsub topic
|
||||
// ctx argument is ignored and left for compatibility.
|
||||
func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error {
|
||||
|
||||
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
@ -41,7 +42,7 @@ func TestWakuRelay(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer relay.Stop()
|
||||
|
||||
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
|
||||
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -92,7 +93,7 @@ func TestWakuRelayUnsubscribedTopic(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer relay.Stop()
|
||||
|
||||
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
|
||||
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -278,7 +279,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
|
||||
defer bcaster.Stop()
|
||||
|
||||
//Create a contentTopic level subscription
|
||||
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic))
|
||||
subs, err := relay.subscribe(protocol.NewContentFilter("", testcTopic))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true)
|
||||
|
||||
@ -299,7 +300,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
//Create a pubSub level subscription
|
||||
subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
|
||||
subs1, err := relay.subscribe(protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
|
||||
require.NoError(t, err)
|
||||
|
||||
msg := &pb.WakuMessage{
|
||||
@ -382,7 +383,7 @@ func TestInvalidMessagePublish(t *testing.T) {
|
||||
|
||||
ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
|
||||
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
|
||||
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test empty contentTopic
|
||||
@ -459,10 +460,10 @@ func TestWakuRelayStaticSharding(t *testing.T) {
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Subscribe to valid static shard topic on both hosts
|
||||
subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
|
||||
subs1, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
|
||||
require.NoError(t, err)
|
||||
|
||||
subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
|
||||
subs2, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
|
||||
require.NoError(t, err)
|
||||
require.True(t, relay2.IsSubscribed(testTopic))
|
||||
require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0])
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user