Merge branch 'master' into jazzz/fix_example_link

This commit is contained in:
Jazz Turner-Baggs 2025-05-29 10:36:40 -07:00 committed by GitHub
commit e0a749b3ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 20 additions and 16 deletions

View File

@ -3,8 +3,9 @@ package relay
import ( import (
"context" "context"
"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"github.com/waku-org/go-waku/waku/v2/protocol"
) )
// Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic. // Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic.
@ -55,3 +56,7 @@ func NewSubscription(contentFilter protocol.ContentFilter) *Subscription {
subType: subType, subType: subType,
} }
} }
func (s *Subscription) ContentFilter() protocol.ContentFilter {
return s.contentFilter
}

View File

@ -12,9 +12,10 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap" "go.uber.org/zap"
proto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
@ -388,7 +389,7 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
} }
// subscribe returns list of Subscription to receive messages based on content filter // subscribe returns list of Subscription to receive messages based on content filter
func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { func (w *WakuRelay) subscribe(contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
var subscriptions []*Subscription var subscriptions []*Subscription
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter) pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
@ -438,11 +439,6 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
w.topicsMutex.Unlock() w.topicsMutex.Unlock()
subscriptions = append(subscriptions, subscription) subscriptions = append(subscriptions, subscription)
go func() {
defer utils.LogOnPanic()
<-ctx.Done()
subscription.Unsubscribe()
}()
} }
return subscriptions, nil return subscriptions, nil
@ -450,11 +446,13 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
// Subscribe returns a Subscription to receive messages as per contentFilter // Subscribe returns a Subscription to receive messages as per contentFilter
// contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding) // contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding)
// ctx argument is ignored and left for compatibility.
func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) { func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
return w.subscribe(ctx, contentFilter, opts...) return w.subscribe(contentFilter, opts...)
} }
// Unsubscribe closes a subscription to a pubsub topic // Unsubscribe closes a subscription to a pubsub topic
// ctx argument is ignored and left for compatibility.
func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error { func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error {
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter) pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)

View File

@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
@ -41,7 +42,7 @@ func TestWakuRelay(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer relay.Stop() defer relay.Stop()
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
require.NoError(t, err) require.NoError(t, err)
@ -92,7 +93,7 @@ func TestWakuRelayUnsubscribedTopic(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer relay.Stop() defer relay.Stop()
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
require.NoError(t, err) require.NoError(t, err)
@ -278,7 +279,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
defer bcaster.Stop() defer bcaster.Stop()
//Create a contentTopic level subscription //Create a contentTopic level subscription
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic)) subs, err := relay.subscribe(protocol.NewContentFilter("", testcTopic))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true) require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true)
@ -299,7 +300,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
defer cancel() defer cancel()
//Create a pubSub level subscription //Create a pubSub level subscription
subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic)) subs1, err := relay.subscribe(protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err) require.NoError(t, err)
msg := &pb.WakuMessage{ msg := &pb.WakuMessage{
@ -382,7 +383,7 @@ func TestInvalidMessagePublish(t *testing.T) {
ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second)
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
require.NoError(t, err) require.NoError(t, err)
// Test empty contentTopic // Test empty contentTopic
@ -459,10 +460,10 @@ func TestWakuRelayStaticSharding(t *testing.T) {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
// Subscribe to valid static shard topic on both hosts // Subscribe to valid static shard topic on both hosts
subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) subs1, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
require.NoError(t, err) require.NoError(t, err)
subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) subs2, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
require.NoError(t, err) require.NoError(t, err)
require.True(t, relay2.IsSubscribed(testTopic)) require.True(t, relay2.IsSubscribed(testTopic))
require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0]) require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0])