From 99d2477035e4f97446e9dce33642a23cb1fc8155 Mon Sep 17 00:00:00 2001 From: kaichao Date: Sat, 31 Aug 2024 09:22:59 +0800 Subject: [PATCH] fix: check subscription when relay publish message (#1212) --- cmd/waku/server/rest/lightpush_rest_test.go | 9 +++- cmd/waku/server/rest/relay_test.go | 9 +++- waku/v2/api/publish/message_sender_test.go | 6 +++ waku/v2/node/wakunode2_test.go | 4 +- waku/v2/protocol/filter/filter_test.go | 14 +++-- .../protocol/lightpush/waku_lightpush_test.go | 6 ++- waku/v2/protocol/relay/waku_relay.go | 8 +++ waku/v2/protocol/relay/waku_relay_test.go | 54 +++++++++++++++++++ 8 files changed, 99 insertions(+), 11 deletions(-) diff --git a/cmd/waku/server/rest/lightpush_rest_test.go b/cmd/waku/server/rest/lightpush_rest_test.go index 54173909..7cf05f09 100644 --- a/cmd/waku/server/rest/lightpush_rest_test.go +++ b/cmd/waku/server/rest/lightpush_rest_test.go @@ -2,6 +2,7 @@ package rest import ( "bytes" + "context" "encoding/json" "net/http" "net/http/httptest" @@ -13,6 +14,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/node" wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -22,8 +24,13 @@ func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNod node1 := createNode(t, node.WithLightPush(), node.WithWakuRelay()) node2 := createNode(t, node.WithLightPush(), node.WithWakuRelay()) + _, err := node1.Relay().Subscribe(context.Background(), protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + _, err = node2.Relay().Subscribe(context.Background(), protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) - err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1) + err = node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1) require.NoError(t, err) err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic}) require.NoError(t, err) diff --git a/cmd/waku/server/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go index 24aedb7e..7697af29 100644 --- a/cmd/waku/server/rest/relay_test.go +++ b/cmd/waku/server/rest/relay_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" @@ -34,8 +35,9 @@ func makeRelayService(t *testing.T, mux *chi.Mux) *RelayService { func TestPostV1Message(t *testing.T) { router := chi.NewRouter() + testTopic := "test" - _ = makeRelayService(t, router) + r := makeRelayService(t, router) msg := &RestWakuMessage{ Payload: []byte{1, 2, 3}, ContentTopic: "abc", @@ -44,8 +46,11 @@ func TestPostV1Message(t *testing.T) { msgJSONBytes, err := json.Marshal(msg) require.NoError(t, err) + _, err = r.node.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic)) + require.NoError(t, err) + rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/test", bytes.NewReader(msgJSONBytes)) + req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/"+testTopic, bytes.NewReader(msgJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) diff --git a/waku/v2/api/publish/message_sender_test.go b/waku/v2/api/publish/message_sender_test.go index d6945c8c..e5f1626e 100644 --- a/waku/v2/api/publish/message_sender_test.go +++ b/waku/v2/api/publish/message_sender_test.go @@ -50,6 +50,9 @@ func TestNewSenderWithRelay(t *testing.T) { err := relayNode.Start(context.Background()) require.Nil(t, err) defer relayNode.Stop() + + _, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic")) + require.Nil(t, err) sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger()) require.Nil(t, err) require.NotNil(t, sender) @@ -72,6 +75,9 @@ func TestNewSenderWithRelayAndMessageSentCheck(t *testing.T) { err := relayNode.Start(context.Background()) require.Nil(t, err) defer relayNode.Stop() + + _, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic")) + require.Nil(t, err) sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger()) check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)} diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 144ce681..99224dc0 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -164,7 +164,7 @@ func Test500(t *testing.T) { sub1, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) require.NoError(t, err) - sub2, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) + sub2, err := wakuNode2.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) require.NoError(t, err) wg := sync.WaitGroup{} @@ -404,7 +404,7 @@ func TestStaticShardingMultipleTopics(t *testing.T) { pubSubTopic3 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(321)) pubSubTopic3Str := pubSubTopic3.String() _, err = r.Publish(ctx, msg2, relay.WithPubSubTopic(pubSubTopic3Str)) - require.NoError(t, err) + require.Error(t, err) time.Sleep(100 * time.Millisecond) diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index ad590901..5aff474e 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -8,7 +8,9 @@ import ( "time" "github.com/stretchr/testify/suite" + "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -213,13 +215,17 @@ func (s *FilterTestSuite) TestStaticSharding() { // Test positive case for static shard pubsub topic - message gets received s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""}) - // Test two negative cases for static shard pubsub topic - message times out - s.waitForTimeout(&WakuMsg{testTopics[0], s.TestContentTopic, ""}) + // Test two negative cases for static shard pubsub topic + msg := &WakuMsg{testTopics[0], s.TestContentTopic, ""} + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic)) + s.Require().Error(err) - s.waitForTimeout(&WakuMsg{testTopics[1], s.TestContentTopic, ""}) + msg = &WakuMsg{testTopics[1], s.TestContentTopic, ""} + _, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic)) + s.Require().Error(err) // Cleanup - _, err := s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{ + _, err = s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{ PubsubTopic: s.TestTopic, ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic), }) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 0dc1ea9e..6ff17f63 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -336,7 +336,8 @@ func TestWakuLightPushCornerCases(t *testing.T) { // Test corner case with default pubSub topic _, err = client.Publish(ctx, msg2, WithDefaultPubsubTopic(), WithPeer(host2.ID())) - require.NoError(t, err) + require.Error(t, err) + require.Equal(t, "lightpush errorCould not publish message: cannot publish to unsubscribed topic", err.Error()) // Test situation when cancel func is nil lightPushNode2.cancel = nil @@ -405,6 +406,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) { // Check that msg2 publish finished without message delivery for unconfigured topic _, err = client.Publish(ctx, msg2, WithPubSubTopic("/waku/2/rsv/25/0"), WithPeer(host2.ID())) - require.NoError(t, err) + require.Error(t, err) + require.Equal(t, "lightpush errorCould not publish message: cannot publish to unsubscribed topic", err.Error()) tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, sub3.Ch) } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 936b37c5..2ff8329a 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -280,12 +280,20 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . if err != nil { return pb.MessageHash{}, err } + _, err = w.subscribeToPubsubTopic(params.pubsubTopic) + if err != nil { + return pb.MessageHash{}, err + } } if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) { return pb.MessageHash{}, errors.New("not enough peers to publish") } + if !w.IsSubscribed(params.pubsubTopic) { + return pb.MessageHash{}, errors.New("cannot publish to unsubscribed topic") + } + w.topicsMutex.Lock() defer w.topicsMutex.Unlock() diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 307d3ee3..1e2f3781 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -73,6 +73,60 @@ func TestWakuRelay(t *testing.T) { <-ctx.Done() } +func TestWakuRelayUnsubscribedTopic(t *testing.T) { + testTopic := defaultTestPubSubTopic + anotherTopic := "/waku/2/go/relay/another-topic" + + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + bcaster := NewBroadcaster(10) + relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + relay.SetHost(host) + err = relay.Start(context.Background()) + require.NoError(t, err) + + err = bcaster.Start(context.Background()) + require.NoError(t, err) + defer relay.Stop() + + subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic)) + + require.NoError(t, err) + + require.Equal(t, relay.IsSubscribed(testTopic), true) + require.Equal(t, relay.IsSubscribed(anotherTopic), false) + + topics := relay.Topics() + require.Equal(t, 1, len(topics)) + require.Equal(t, testTopic, topics[0]) + + ctx, cancel := context.WithCancel(context.Background()) + bytesToSend := []byte{1} + go func() { + defer cancel() + env := <-subs[0].Ch + if env != nil { + t.Log("received msg", logging.Hash(env.Hash())) + } + }() + + msg := &pb.WakuMessage{ + Payload: bytesToSend, + ContentTopic: "test", + } + _, err = relay.Publish(context.Background(), msg, WithPubSubTopic(anotherTopic)) + require.Error(t, err) + + time.Sleep(2 * time.Second) + + err = relay.Unsubscribe(ctx, protocol.NewContentFilter(testTopic)) + require.NoError(t, err) + <-ctx.Done() +} + func createRelayNode(t *testing.T) (host.Host, *WakuRelay) { port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err)