diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index c46077cc..81112746 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -3,6 +3,8 @@ package lightpush import ( "context" "crypto/rand" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "go.uber.org/zap" "sync" "testing" "time" @@ -39,6 +41,22 @@ func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Su return relay, sub[0], host } +func waitForMsg(t *testing.T, wg *sync.WaitGroup, ch chan *protocol.Envelope) { + wg.Add(1) + log := utils.Logger() + go func() { + defer wg.Done() + select { + case env := <-ch: + msg := env.Message() + log.Info("Received ", zap.String("msg", msg.String())) + case <-time.After(2 * time.Second): + require.Fail(t, "Message timeout") + } + }() + wg.Wait() +} + // Node1: Relay // Node2: Relay+Lightpush // Client that will lightpush a message @@ -226,3 +244,90 @@ func TestWakuLightPushAutoSharding(t *testing.T) { wg.Wait() } + +func TestWakuLightPushCornerCases(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testTopic := "/waku/2/go/lightpush/test" + testContentTopic := "/test/10/my-lp-app/proto" + + // Prepare peer manager instance to include in test + pm := peermanager.NewPeerManager(10, 10, utils.Logger()) + + node1, sub1, host1 := makeWakuRelay(t, testTopic) + defer node1.Stop() + defer sub1.Unsubscribe() + + node2, sub2, host2 := makeWakuRelay(t, testTopic) + defer node2.Stop() + defer sub2.Unsubscribe() + + lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger()) + lightPushNode2.SetHost(host2) + err := lightPushNode2.Start(ctx) + require.NoError(t, err) + defer lightPushNode2.Stop() + + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) + client.SetHost(clientHost) + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), relay.WakuRelayID_v200) + require.NoError(t, err) + + err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host1.ID())) + require.NoError(t, err) + + clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1) + require.NoError(t, err) + + msg2 := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()) + + // Wait for the mesh connection to happen between node1 and node2 + time.Sleep(2 * time.Second) + + var wg sync.WaitGroup + + var lpOptions []Option + lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) + lpOptions = append(lpOptions, WithPeer(host2.ID())) + + // Check that msg publish has passed for nominal case + _, err = client.Publish(ctx, msg2, lpOptions...) + require.NoError(t, err) + + // Wait for the nominal case message at node1 + waitForMsg(t, &wg, sub1.Ch) + + // Test error case with nil message + _, err = client.Publish(ctx, nil, lpOptions...) + require.Error(t, err) + + // Create new "dummy" host - not related to any node + host3, err := tests.MakeHost(context.Background(), 12345, rand.Reader) + require.NoError(t, err) + + var lpOptions2 []Option + + // Test error case with empty options + _, err = client.Publish(ctx, msg2, lpOptions2...) + require.Error(t, err) + + // Test error case with unrelated host + _, err = client.Publish(ctx, msg2, WithPubSubTopic(testTopic), WithPeer(host3.ID())) + require.Error(t, err) + + // Test corner case with default pubSub topic + _, err = client.Publish(ctx, msg2, WithDefaultPubsubTopic(), WithPeer(host2.ID())) + require.NoError(t, err) + + // Test situation when cancel func is nil + lightPushNode2.cancel = nil +}