diff --git a/tests/utils.go b/tests/utils.go index 10c99be6..8b9dec22 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -9,6 +9,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/waku-org/go-waku/waku/v2/protocol" "io" "math" "math/big" @@ -16,7 +17,9 @@ import ( "net/url" "strconv" "strings" + "sync" "testing" + "time" "unicode/utf8" gcrypto "github.com/ethereum/go-ethereum/crypto" @@ -385,3 +388,36 @@ func GenerateRandomSQLInsert(maxLength int) (string, error) { return query, nil } + +func WaitForMsg(t *testing.T, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) { + wg.Add(1) + log := utils.Logger() + go func() { + defer wg.Done() + select { + case env := <-ch: + msg := env.Message() + log.Info("Received ", zap.String("msg", msg.String())) + case <-time.After(timeout): + require.Fail(t, "Message timeout") + } + }() + wg.Wait() +} + +func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) { + wg.Add(1) + go func() { + defer wg.Done() + select { + case _, ok := <-ch: + require.False(t, ok, "should not retrieve message") + case <-time.After(timeout): + // All good + case <-ctx.Done(): + require.Fail(t, "test exceeded allocated time") + } + }() + + wg.Wait() +} diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 1912fc78..d4ca453c 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -3,12 +3,17 @@ package node import ( "bytes" "context" + "fmt" "math/big" + "math/rand" "net" + "os" "sync" "testing" "time" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/prometheus/client_golang/prometheus" @@ -99,6 +104,7 @@ func TestUpAndDown(t *testing.T) { WithWakuRelay(), WithDiscoveryV5(0, bootnodes, true), ) + require.NoError(t, err) for i := 0; i < 5; i++ { @@ -319,3 +325,218 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.Len(t, result.Messages, 1) require.Equal(t, msg.Timestamp, result.Messages[0].Timestamp) } + +func TestStaticShardingMultipleTopics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testClusterID := uint16(20) + + // Node1 with Relay + hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + wakuNode1, err := New( + WithHostAddress(hostAddr1), + WithWakuRelay(), + WithClusterID(testClusterID), + ) + require.NoError(t, err) + err = wakuNode1.Start(ctx) + require.NoError(t, err) + defer wakuNode1.Stop() + + pubSubTopic1 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(0)) + pubSubTopic1Str := pubSubTopic1.String() + contentTopic1 := "/test/2/my-app/sharded" + + pubSubTopic2 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(10)) + pubSubTopic2Str := pubSubTopic2.String() + contentTopic2 := "/test/3/my-app/sharded" + + require.Equal(t, testClusterID, wakuNode1.ClusterID()) + + r := wakuNode1.Relay() + + subs1, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic1Str, contentTopic1)) + require.NoError(t, err) + + subs2, err := r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic2Str, contentTopic2)) + require.NoError(t, err) + + require.NotEqual(t, subs1[0].ID, subs2[0].ID) + + require.True(t, r.IsSubscribed(pubSubTopic1Str)) + require.True(t, r.IsSubscribed(pubSubTopic2Str)) + + s1, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic1Str, contentTopic1) + require.NoError(t, err) + s2, err := r.GetSubscriptionWithPubsubTopic(pubSubTopic2Str, contentTopic2) + require.NoError(t, err) + require.Equal(t, s1.ID, subs1[0].ID) + require.Equal(t, s2.ID, subs2[0].ID) + + // Wait for subscriptions + time.Sleep(1 * time.Second) + + // Send message to subscribed topic + msg := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message") + + _, err = r.Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic1Str)) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + var wg sync.WaitGroup + wg.Add(1) + // Message msg could be retrieved + go func() { + defer wg.Done() + env, ok := <-subs1[0].Ch + require.True(t, ok, "no message retrieved") + require.Equal(t, msg.Timestamp, env.Message().Timestamp) + }() + + wg.Wait() + + // Send another message to non-subscribed pubsub topic, but subscribed content topic + msg2 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message 2") + pubSubTopic3 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(321)) + pubSubTopic3Str := pubSubTopic3.String() + _, err = r.Publish(ctx, msg2, relay.WithPubSubTopic(pubSubTopic3Str)) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // No message could be retrieved + tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch) + + // Send another message to subscribed pubsub topic, but not subscribed content topic - mix it up + msg3 := tests.CreateWakuMessage(contentTopic2, utils.GetUnixEpoch(), "test message 3") + + _, err = r.Publish(ctx, msg3, relay.WithPubSubTopic(pubSubTopic1Str)) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // No message could be retrieved + tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch) + +} + +func TestStaticShardingLimits(t *testing.T) { + + log := utils.Logger() + + if os.Getenv("RUN_FLAKY_TESTS") != "true" { + + log.Info("Skipping", zap.String("test", t.Name()), + zap.String("reason", "RUN_FLAKY_TESTS environment variable is not set to true")) + t.SkipNow() + } + + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + + testClusterID := uint16(21) + + // Node1 with Relay + hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + discv5UDPPort1, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3) + require.NoError(t, err) + wakuNode1, err := New( + WithHostAddress(hostAddr1), + WithWakuRelay(), + WithClusterID(testClusterID), + WithDiscoveryV5(uint(discv5UDPPort1), nil, true), + ) + require.NoError(t, err) + err = wakuNode1.Start(ctx) + require.NoError(t, err) + defer wakuNode1.Stop() + + // Node2 with Relay + hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + discv5UDPPort2, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3) + require.NoError(t, err) + wakuNode2, err := New( + WithHostAddress(hostAddr2), + WithWakuRelay(), + WithClusterID(testClusterID), + WithDiscoveryV5(uint(discv5UDPPort2), []*enode.Node{wakuNode1.localNode.Node()}, true), + ) + require.NoError(t, err) + err = wakuNode2.Start(ctx) + require.NoError(t, err) + defer wakuNode2.Stop() + + err = wakuNode1.DiscV5().Start(ctx) + require.NoError(t, err) + err = wakuNode2.DiscV5().Start(ctx) + require.NoError(t, err) + + // Wait for discovery + time.Sleep(3 * time.Second) + + contentTopic1 := "/test/2/my-app/sharded" + + r1 := wakuNode1.Relay() + r2 := wakuNode2.Relay() + + var shardedPubSubTopics []string + + // Subscribe topics related to static sharding + for i := 0; i < 1024; i++ { + shardedPubSubTopics = append(shardedPubSubTopics, fmt.Sprintf("/waku/2/rs/%d/%d", testClusterID, i)) + _, err = r1.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1)) + require.NoError(t, err) + time.Sleep(10 * time.Millisecond) + } + + // Let ENR updates to finish + time.Sleep(3 * time.Second) + + // Subscribe topics related to static sharding + for i := 0; i < 1024; i++ { + _, err = r2.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1)) + require.NoError(t, err) + time.Sleep(10 * time.Millisecond) + } + + // Let ENR updates to finish + time.Sleep(3 * time.Second) + + // Check ENR value after 1024 subscriptions + shardsENR, err := wenr.RelaySharding(wakuNode1.ENR().Record()) + require.NoError(t, err) + require.Equal(t, testClusterID, shardsENR.ClusterID) + require.Equal(t, 1, len(shardsENR.ShardIDs)) + + // Prepare message + msg1 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message") + + // Select shard to publish + randomShard := rand.Intn(1024) + + // Check both nodes are subscribed + require.True(t, r1.IsSubscribed(shardedPubSubTopics[randomShard])) + require.True(t, r2.IsSubscribed(shardedPubSubTopics[randomShard])) + + time.Sleep(1 * time.Second) + + // Publish on node1 + _, err = r1.Publish(ctx, msg1, relay.WithPubSubTopic(shardedPubSubTopics[randomShard])) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + s2, err := r2.GetSubscriptionWithPubsubTopic(shardedPubSubTopics[randomShard], contentTopic1) + require.NoError(t, err) + + var wg sync.WaitGroup + + // Retrieve on node2 + tests.WaitForMsg(t, 2*time.Second, &wg, s2.Ch) + +} diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index e5dac718..be5a0e2c 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -543,3 +543,67 @@ func (s *FilterTestSuite) BeforeTest(suiteName, testName string) { func (s *FilterTestSuite) AfterTest(suiteName, testName string) { s.log.Info("Finished executing ", zap.String("testName", testName)) } + +func (s *FilterTestSuite) TestStaticSharding() { + log := utils.Logger() + s.log = log + s.wg = &sync.WaitGroup{} + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + s.ctx = ctx + s.ctxCancel = cancel + + // Gen pubsub topic "/waku/2/rs/100/100" + s.testTopic = protocol.NewStaticShardingPubsubTopic(uint16(100), uint16(100)).String() + + // Pubsub topics for neg. test cases + testTopics := []string{ + "/waku/2/rs/100/1024", + "/waku/2/rs/100/101", + } + s.testContentTopic = "/test/10/my-filter-app/proto" + + // Prepare new nodes + s.lightNode = s.makeWakuFilterLightNode(true, true) + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true) + + // Connect nodes + s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) + + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + msg := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()) + + // Test positive case for static shard pubsub topic - message gets received + s.waitForMsg(func() { + _, err := s.relayNode.Publish(s.ctx, msg, relay.WithPubSubTopic(s.testTopic)) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + // Test two negative cases for static shard pubsub topic - message times out + s.waitForTimeout(func() { + _, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[0])) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + s.waitForTimeout(func() { + _, err := s.relayNode.Publish(s.ctx, msg2, relay.WithPubSubTopic(testTopics[1])) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + // Cleanup + _, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{ + PubsubTopic: s.testTopic, + ContentTopics: protocol.NewContentTopicSet(s.testContentTopic), + }) + s.Require().NoError(err) + + _, err = s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) +} diff --git a/waku/v2/protocol/legacy_store/waku_store_protocol_test.go b/waku/v2/protocol/legacy_store/waku_store_protocol_test.go index b0b315fc..e240541a 100644 --- a/waku/v2/protocol/legacy_store/waku_store_protocol_test.go +++ b/waku/v2/protocol/legacy_store/waku_store_protocol_test.go @@ -426,3 +426,86 @@ func TestWakuStoreStart(t *testing.T) { defer s2.Stop() } + +func TestWakuStoreWithStaticSharding(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + s1.SetHost(host1) + + // Prepare pubsub topics for static sharding + pubSubTopics := protocol.ShardsToTopics(20, []int{1, 2, 3, 4}) + + // Prepare test messages + now := *utils.GetUnixEpoch() + msg1 := tests.CreateWakuMessage("hello", proto.Int64(now)) + + nowPlusOne := proto.Int64(now + 1) + msg2 := tests.CreateWakuMessage("/test/2/my-app/sharded", nowPlusOne) + + nowPlusTwo := proto.Int64(now + 2) + msg3 := tests.CreateWakuMessage("/test/2/my-app/sharded", nowPlusTwo) + + // Subscribe to pubSubtopics and start store1 + host1 with them + sub := SimulateSubscription([]*protocol.Envelope{ + protocol.NewEnvelope(msg1, *utils.GetUnixEpoch(), pubSubTopics[0]), + protocol.NewEnvelope(msg2, *utils.GetUnixEpoch(), pubSubTopics[1]), + protocol.NewEnvelope(msg3, *utils.GetUnixEpoch(), pubSubTopics[2]), + }) + err = s1.Start(ctx, sub) + require.NoError(t, err) + defer s1.Stop() + + host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4) + require.NoError(t, err) + + s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + s2.SetHost(host2) + + // Subscribe to different pubSubTopics[3] at store2 + host2 + sub1 := relay.NewSubscription(protocol.NewContentFilter(pubSubTopics[3])) + + err = s2.Start(ctx, sub1) + require.NoError(t, err) + defer s2.Stop() + + q1 := Query{ + PubsubTopic: pubSubTopics[0], + } + + fn1 := func(msg *pb.WakuMessage) (bool, error) { + return msg.ContentTopic == "hello", nil + } + + // Find msg1 on the second host2+s2 + foundMsg, err := s2.Find(ctx, q1, fn1, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) + require.NoError(t, err) + require.NotNil(t, foundMsg) + require.Equal(t, "hello", foundMsg.ContentTopic) + + q2 := Query{ + PubsubTopic: pubSubTopics[1], + } + + // Find msg2 on the second host2+s2; No other messages (msg3) should be found + result, err := s2.Query(ctx, q2, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) + require.NoError(t, err) + + for i, m := range result.Messages { + if i == 0 { + require.Equal(t, "/test/2/my-app/sharded", m.ContentTopic) + require.Equal(t, nowPlusOne, m.Timestamp) + } else { + require.Fail(t, "Unexpected message found") + } + } + +} diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 08c134ee..189c21c8 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -7,14 +7,12 @@ import ( "testing" "time" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "go.uber.org/zap" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -42,22 +40,6 @@ func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Su return relay, sub[0], host } -func waitForMsg(t *testing.T, wg *sync.WaitGroup, ch chan *protocol.Envelope) { - wg.Add(1) - log := utils.Logger() - go func() { - defer wg.Done() - select { - case env := <-ch: - msg := env.Message() - log.Info("Received ", zap.String("msg", msg.String())) - case <-time.After(2 * time.Second): - require.Fail(t, "Message timeout") - } - }() - wg.Wait() -} - // Node1: Relay // Node2: Relay+Lightpush // Client that will lightpush a message @@ -305,7 +287,7 @@ func TestWakuLightPushCornerCases(t *testing.T) { require.NoError(t, err) // Wait for the nominal case message at node1 - waitForMsg(t, &wg, sub1.Ch) + tests.WaitForMsg(t, 2*time.Second, &wg, sub1.Ch) // Test error case with nil message _, err = client.Publish(ctx, nil, lpOptions...) @@ -332,3 +314,70 @@ func TestWakuLightPushCornerCases(t *testing.T) { // Test situation when cancel func is nil lightPushNode2.cancel = nil } + +func TestWakuLightPushWithStaticSharding(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Prepare pubsub topic for static sharding + pubSubTopic := protocol.NewStaticShardingPubsubTopic(uint16(25), uint16(0)).String() + testContentTopic := "/test/10/my-lp-app/proto" + + // Node topology: clientNode (lightpush client) <-> node2(relay+lightpush server) <-> node3(relay) + // ClientNode + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) + client.SetHost(clientHost) + + // Node2 + node2, sub2, host2 := makeWakuRelay(t, pubSubTopic) + defer node2.Stop() + defer sub2.Unsubscribe() + + lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger()) + lightPushNode2.SetHost(host2) + err = lightPushNode2.Start(ctx) + require.NoError(t, err) + defer lightPushNode2.Stop() + + // Node3 + node3, sub3, host3 := makeWakuRelay(t, pubSubTopic) + defer node3.Stop() + defer sub3.Unsubscribe() + + // Add path clientNode (lightpush client) -> node2(relay+lightpush server) + clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1) + require.NoError(t, err) + + // Add path node2(relay+lightpush server) -> node3(relay) + host2.Peerstore().AddAddr(host3.ID(), tests.GetHostAddress(host3), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host3.ID(), relay.WakuRelayID_v200) + require.NoError(t, err) + + err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host3.ID())) + require.NoError(t, err) + + // Create messages + msg := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()) + + // Wait for the mesh connection to happen between nodes + time.Sleep(2 * time.Second) + + var wg sync.WaitGroup + + // Check that msg publish has led to message deliver for existing topic + _, err = client.Publish(ctx, msg, WithPubSubTopic(pubSubTopic), WithPeer(host2.ID())) + require.NoError(t, err) + tests.WaitForMsg(t, 2*time.Second, &wg, sub3.Ch) + + // Check that msg2 publish finished without message delivery for unconfigured topic + _, err = client.Publish(ctx, msg2, WithPubSubTopic("/waku/2/rsv/25/0"), WithPeer(host2.ID())) + require.NoError(t, err) + tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, sub3.Ch) +} diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 7dc84858..307d3ee3 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -355,3 +355,77 @@ func TestInvalidMessagePublish(t *testing.T) { ctxCancel() } + +func TestWakuRelayStaticSharding(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Follow spec /waku/2/rs// + testTopic := "/waku/2/rs/64/0" + testContentTopic := "/test/10/my-relay" + + // Host1 + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host1, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + bcaster1 := NewBroadcaster(10) + relay1 := NewWakuRelay(bcaster1, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + relay1.SetHost(host1) + err = relay1.Start(context.Background()) + require.NoError(t, err) + + err = bcaster1.Start(context.Background()) + require.NoError(t, err) + defer relay1.Stop() + + // Host2 + port, err = tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host2, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + bcaster2 := NewBroadcaster(10) + relay2 := NewWakuRelay(bcaster2, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + relay2.SetHost(host2) + err = relay2.Start(context.Background()) + require.NoError(t, err) + + err = bcaster2.Start(context.Background()) + require.NoError(t, err) + defer relay2.Stop() + + // Connect nodes + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), WakuRelayID_v200) + require.NoError(t, err) + + // Wait for the mesh connection to happen between node1 and node2 + time.Sleep(2 * time.Second) + + // Subscribe to valid static shard topic on both hosts + subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) + require.NoError(t, err) + + subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic)) + require.NoError(t, err) + require.True(t, relay2.IsSubscribed(testTopic)) + require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0]) + + msg := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch(), "test_payload") + + // Test publish from host2 using autosharding -> should fail on topic format + _, err = relay2.Publish(ctx, msg) + require.Error(t, err) + + // Test publish from host2 using static sharding -> should succeed + _, err = relay2.Publish(ctx, msg, WithPubSubTopic(testTopic)) + require.NoError(t, err) + + var wg sync.WaitGroup + + // Msg should get received on host1 + tests.WaitForMsg(t, 2*time.Second, &wg, subs1[0].Ch) + +}