fix: check subscription when relay publish message (#1212)

This commit is contained in:
kaichao 2024-08-31 09:22:59 +08:00 committed by GitHub
parent 690849c986
commit 99d2477035
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 99 additions and 11 deletions

View File

@ -2,6 +2,7 @@ package rest
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
@ -13,6 +14,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -22,8 +24,13 @@ func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNod
node1 := createNode(t, node.WithLightPush(), node.WithWakuRelay())
node2 := createNode(t, node.WithLightPush(), node.WithWakuRelay())
_, err := node1.Relay().Subscribe(context.Background(), protocol.NewContentFilter(pubSubTopic))
require.NoError(t, err)
_, err = node2.Relay().Subscribe(context.Background(), protocol.NewContentFilter(pubSubTopic))
require.NoError(t, err)
node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1)
err = node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1)
require.NoError(t, err)
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic})
require.NoError(t, err)

View File

@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -34,8 +35,9 @@ func makeRelayService(t *testing.T, mux *chi.Mux) *RelayService {
func TestPostV1Message(t *testing.T) {
router := chi.NewRouter()
testTopic := "test"
_ = makeRelayService(t, router)
r := makeRelayService(t, router)
msg := &RestWakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
@ -44,8 +46,11 @@ func TestPostV1Message(t *testing.T) {
msgJSONBytes, err := json.Marshal(msg)
require.NoError(t, err)
_, err = r.node.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/test", bytes.NewReader(msgJSONBytes))
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/"+testTopic, bytes.NewReader(msgJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())

View File

@ -50,6 +50,9 @@ func TestNewSenderWithRelay(t *testing.T) {
err := relayNode.Start(context.Background())
require.Nil(t, err)
defer relayNode.Stop()
_, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic"))
require.Nil(t, err)
sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger())
require.Nil(t, err)
require.NotNil(t, sender)
@ -72,6 +75,9 @@ func TestNewSenderWithRelayAndMessageSentCheck(t *testing.T) {
err := relayNode.Start(context.Background())
require.Nil(t, err)
defer relayNode.Stop()
_, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic"))
require.Nil(t, err)
sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger())
check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)}

View File

@ -164,7 +164,7 @@ func Test500(t *testing.T) {
sub1, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
sub2, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
sub2, err := wakuNode2.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
wg := sync.WaitGroup{}
@ -404,7 +404,7 @@ func TestStaticShardingMultipleTopics(t *testing.T) {
pubSubTopic3 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(321))
pubSubTopic3Str := pubSubTopic3.String()
_, err = r.Publish(ctx, msg2, relay.WithPubSubTopic(pubSubTopic3Str))
require.NoError(t, err)
require.Error(t, err)
time.Sleep(100 * time.Millisecond)

View File

@ -8,7 +8,9 @@ import (
"time"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
@ -213,13 +215,17 @@ func (s *FilterTestSuite) TestStaticSharding() {
// Test positive case for static shard pubsub topic - message gets received
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""})
// Test two negative cases for static shard pubsub topic - message times out
s.waitForTimeout(&WakuMsg{testTopics[0], s.TestContentTopic, ""})
// Test two negative cases for static shard pubsub topic
msg := &WakuMsg{testTopics[0], s.TestContentTopic, ""}
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic))
s.Require().Error(err)
s.waitForTimeout(&WakuMsg{testTopics[1], s.TestContentTopic, ""})
msg = &WakuMsg{testTopics[1], s.TestContentTopic, ""}
_, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic))
s.Require().Error(err)
// Cleanup
_, err := s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
_, err = s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.TestTopic,
ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic),
})

View File

@ -336,7 +336,8 @@ func TestWakuLightPushCornerCases(t *testing.T) {
// Test corner case with default pubSub topic
_, err = client.Publish(ctx, msg2, WithDefaultPubsubTopic(), WithPeer(host2.ID()))
require.NoError(t, err)
require.Error(t, err)
require.Equal(t, "lightpush errorCould not publish message: cannot publish to unsubscribed topic", err.Error())
// Test situation when cancel func is nil
lightPushNode2.cancel = nil
@ -405,6 +406,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) {
// Check that msg2 publish finished without message delivery for unconfigured topic
_, err = client.Publish(ctx, msg2, WithPubSubTopic("/waku/2/rsv/25/0"), WithPeer(host2.ID()))
require.NoError(t, err)
require.Error(t, err)
require.Equal(t, "lightpush errorCould not publish message: cannot publish to unsubscribed topic", err.Error())
tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, sub3.Ch)
}

View File

@ -280,12 +280,20 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
if err != nil {
return pb.MessageHash{}, err
}
_, err = w.subscribeToPubsubTopic(params.pubsubTopic)
if err != nil {
return pb.MessageHash{}, err
}
}
if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) {
return pb.MessageHash{}, errors.New("not enough peers to publish")
}
if !w.IsSubscribed(params.pubsubTopic) {
return pb.MessageHash{}, errors.New("cannot publish to unsubscribed topic")
}
w.topicsMutex.Lock()
defer w.topicsMutex.Unlock()

View File

@ -73,6 +73,60 @@ func TestWakuRelay(t *testing.T) {
<-ctx.Done()
}
func TestWakuRelayUnsubscribedTopic(t *testing.T) {
testTopic := defaultTestPubSubTopic
anotherTopic := "/waku/2/go/relay/another-topic"
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcaster := NewBroadcaster(10)
relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
err = bcaster.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)
require.Equal(t, relay.IsSubscribed(testTopic), true)
require.Equal(t, relay.IsSubscribed(anotherTopic), false)
topics := relay.Topics()
require.Equal(t, 1, len(topics))
require.Equal(t, testTopic, topics[0])
ctx, cancel := context.WithCancel(context.Background())
bytesToSend := []byte{1}
go func() {
defer cancel()
env := <-subs[0].Ch
if env != nil {
t.Log("received msg", logging.Hash(env.Hash()))
}
}()
msg := &pb.WakuMessage{
Payload: bytesToSend,
ContentTopic: "test",
}
_, err = relay.Publish(context.Background(), msg, WithPubSubTopic(anotherTopic))
require.Error(t, err)
time.Sleep(2 * time.Second)
err = relay.Unsubscribe(ctx, protocol.NewContentFilter(testTopic))
require.NoError(t, err)
<-ctx.Done()
}
func createRelayNode(t *testing.T) (host.Host, *WakuRelay) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)