From 90309079603cd9a398452c41673a9caf28ec1ce5 Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Thu, 28 Oct 2021 14:41:17 +0200 Subject: [PATCH] test: add test for lightpush (#106) * test: add test for lightpush Co-authored-by: Richard Ramos --- tests/utils.go | 69 +++++++++++ waku/v2/protocol/lightpush/waku_lightpush.go | 6 +- .../protocol/lightpush/waku_lightpush_test.go | 110 ++++++++++++++++++ waku/v2/protocol/relay/waku_relay.go | 2 - waku/v2/protocol/relay/waku_relay_test.go | 61 +--------- .../store/waku_store_protocol_test.go | 9 +- 6 files changed, 188 insertions(+), 69 deletions(-) create mode 100644 tests/utils.go create mode 100644 waku/v2/protocol/lightpush/waku_lightpush_test.go diff --git a/tests/utils.go b/tests/utils.go new file mode 100644 index 00000000..481a3beb --- /dev/null +++ b/tests/utils.go @@ -0,0 +1,69 @@ +package tests + +import ( + "context" + "fmt" + "io" + "net" + "testing" + + "github.com/ethereum/go-ethereum/log" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + "github.com/multiformats/go-multiaddr" + ma "github.com/multiformats/go-multiaddr" +) + +func GetHostAddress(ha host.Host) ma.Multiaddr { + return ha.Addrs()[0] +} + +func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) { + t.Helper() + + if host == "" { + host = "localhost" + } + + for i := 0; i < maxAttempts; i++ { + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, "0")) + if err != nil { + t.Logf("unable to resolve tcp addr: %v", err) + continue + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + l.Close() + t.Logf("unable to listen on addr %q: %v", addr, err) + continue + } + + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return port, nil + + } + + return 0, fmt.Errorf("no free port found") +} + +func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) { + // Creates a new RSA key pair for this host. + prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, randomness) + if err != nil { + log.Error(err.Error()) + return nil, err + } + + // 0.0.0.0 will listen on any interface device. + sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)) + + // libp2p.New constructs a new libp2p Host. + // Other options can be added here. + return libp2p.New( + ctx, + libp2p.ListenAddrs(sourceMultiAddr), + libp2p.Identity(prvKey), + ) +} diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index f46f6746..6d55546f 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -39,8 +39,10 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) wakuLP.ctx = ctx wakuLP.h = h - wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) - log.Info("Light Push protocol started") + if relay != nil { + wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) + log.Info("Light Push protocol started") + } return wakuLP } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go new file mode 100644 index 00000000..afda245c --- /dev/null +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -0,0 +1,110 @@ +package lightpush + +import ( + "context" + "crypto/rand" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peerstore" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/status-im/go-waku/tests" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/relay" + "github.com/stretchr/testify/require" +) + +func makeWakuRelay(t *testing.T, topic relay.Topic) (*relay.WakuRelay, *pubsub.Subscription, host.Host) { + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + relay, err := relay.NewWakuRelay(context.Background(), host) + require.NoError(t, err) + + sub, _, err := relay.Subscribe(topic) + require.NoError(t, err) + + return relay, sub, host +} + +// Node1: Relay +// Node2: Relay+Lightpush +// Client that will lightpush a message +// +// Node1 and Node 2 are peers +// Client and Node 2 are peers +// Node 3 will use lightpush request, sending the message to Node2 +// +// Client send a succesful message using lightpush +// Node2 receive the message and broadcast it +// Node1 receive the message +func TestWakuLightPush(t *testing.T) { + var testTopic relay.Topic = "/waku/2/go/lightpush/test" + node1, sub1, host1 := makeWakuRelay(t, testTopic) + defer node1.Stop() + defer sub1.Cancel() + + node2, sub2, host2 := makeWakuRelay(t, testTopic) + defer node2.Stop() + defer sub2.Cancel() + + ctx := context.Background() + lightPushNode2 := NewWakuLightPush(ctx, host2, node2) + defer lightPushNode2.Stop() + + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + client := NewWakuLightPush(ctx, clientHost, nil) + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200)) + require.NoError(t, err) + + err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host1.ID())) + require.NoError(t, err) + + clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err = clientHost.Peerstore().AddProtocols(host2.ID(), string(LightPushID_v20beta1)) + require.NoError(t, err) + + req := new(pb.PushRequest) + req.Message = &pb.WakuMessage{ + Payload: []byte{1}, + Version: 0, + ContentTopic: "test", + Timestamp: 0, + } + req.PubsubTopic = string(testTopic) + + // Wait for the mesh connection to happen between node1 and node2 + time.Sleep(5 * time.Second) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + _, err := sub1.Next(context.Background()) + require.NoError(t, err) + }() + + wg.Add(1) + go func() { + defer wg.Done() + _, err := sub2.Next(context.Background()) + require.NoError(t, err) + }() + + resp, err := client.Request(ctx, req, []LightPushOption{}...) + require.NoError(t, err) + require.True(t, resp.IsSuccess) + + wg.Wait() +} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 4b8bb82a..b5f22d56 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -136,7 +136,6 @@ func (w *WakuRelay) Subscribe(topic Topic) (subs *pubsub.Subscription, isNew boo func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) { // Publish a `WakuMessage` to a PubSub topic. - if w.pubsub == nil { return nil, errors.New("PubSub hasn't been set") } @@ -157,7 +156,6 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic } err = pubSubTopic.Publish(ctx, out) - if err != nil { return nil, err } diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index e265d7fd..7b57ea4a 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -3,75 +3,20 @@ package relay import ( "context" "crypto/rand" - "fmt" - "io" - "net" "testing" - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/host" - "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/stretchr/testify/require" ) -func findFreePort(t *testing.T, host string, maxAttempts int) (int, error) { - t.Helper() - - if host == "" { - host = "localhost" - } - - for i := 0; i < maxAttempts; i++ { - addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, "0")) - if err != nil { - t.Logf("unable to resolve tcp addr: %v", err) - continue - } - l, err := net.ListenTCP("tcp", addr) - if err != nil { - l.Close() - t.Logf("unable to listen on addr %q: %v", addr, err) - continue - } - - port := l.Addr().(*net.TCPAddr).Port - l.Close() - return port, nil - - } - - return 0, fmt.Errorf("no free port found") -} - -func makeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) { - // Creates a new RSA key pair for this host. - prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, randomness) - if err != nil { - log.Error(err) - return nil, err - } - - // 0.0.0.0 will listen on any interface device. - sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)) - - // libp2p.New constructs a new libp2p Host. - // Other options can be added here. - return libp2p.New( - ctx, - libp2p.ListenAddrs(sourceMultiAddr), - libp2p.Identity(prvKey), - ) -} - func TestWakuRelay(t *testing.T) { var testTopic Topic = "/waku/2/go/relay/test" - port, err := findFreePort(t, "", 5) + port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) - host, err := makeHost(context.Background(), port, rand.Reader) + host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) relay, err := NewWakuRelay(context.Background(), host) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 3b7218ed..02d9a71c 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -5,19 +5,14 @@ import ( "testing" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peerstore" - ma "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" ) -func getHostAddress(ha host.Host) ma.Multiaddr { - return ha.Addrs()[0] -} - func TestWakuStoreProtocolQuery(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -48,7 +43,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { s2.Start(ctx, host2) defer s2.Stop() - host2.Peerstore().AddAddr(host1.ID(), getHostAddress(host1), peerstore.PermanentAddrTTL) + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) require.NoError(t, err)