From 7eb7647599e72de5aa837ac5edd203782f1bd374 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Fri, 13 Dec 2024 17:26:49 +0100 Subject: [PATCH] chore: adding relay test (#8) --- waku/nwaku.go | 9 ++++ waku/nwaku_test.go | 103 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 103 insertions(+), 9 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index 070e233..b6ab7d7 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -424,6 +424,11 @@ func (w *Waku) DialPeer(address multiaddr.Multiaddr) error { return w.node.Connect(ctx, address) } +// TODO: change pubsub topic to shard notation everywhere +func (w *Waku) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { + return w.node.RelayPublish(ctx, message, pubsubTopic) +} + func (w *Waku) DialPeerByID(peerID peer.ID, protocol libp2pproto.ID) error { ctx, cancel := context.WithTimeout(w.ctx, requestTimeout) defer cancel() @@ -1241,3 +1246,7 @@ func getContextTimeoutMilliseconds(ctx context.Context) int { } return 0 } + +func FormatWakuRelayTopic(clusterId uint16, shard uint16) string { + return fmt.Sprintf("/waku/2/rs/%d/%d", clusterId, shard) +} diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index d558511..1301fdb 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -4,15 +4,18 @@ import ( "context" "errors" "slices" + "sync" "testing" "time" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" ) @@ -190,7 +193,7 @@ func TestPeerExchange(t *testing.T) { ClusterID: 16, Shards: []uint16{64}, PeerExchange: false, - Discv5UdpPort: 9001, + Discv5UdpPort: 9010, TcpPort: 60010, } @@ -214,7 +217,7 @@ func TestPeerExchange(t *testing.T) { ClusterID: 16, Shards: []uint16{64}, PeerExchange: true, - Discv5UdpPort: 9000, + Discv5UdpPort: 9011, Discv5BootstrapNodes: []string{discv5NodeEnr.String()}, TcpPort: 60011, } @@ -259,7 +262,7 @@ func TestPeerExchange(t *testing.T) { ClusterID: 16, Shards: []uint16{64}, PeerExchange: true, - Discv5UdpPort: 9002, + Discv5UdpPort: 9012, TcpPort: 60012, PeerExchangeNode: serverNodeMa[0].String(), } @@ -318,8 +321,8 @@ func TestDnsDiscover(t *testing.T) { LogLevel: "DEBUG", ClusterID: 16, Shards: []uint16{64}, - Discv5UdpPort: 9040, - TcpPort: 60040, + Discv5UdpPort: 9020, + TcpPort: 60020, } node, err := New(&nodeWakuConfig, logger.Named("node")) require.NoError(t, err) @@ -347,8 +350,8 @@ func TestDial(t *testing.T) { Discv5Discovery: false, ClusterID: 16, Shards: []uint16{64}, - Discv5UdpPort: 9020, - TcpPort: 60020, + Discv5UdpPort: 9030, + TcpPort: 60030, } dialerNode, err := New(&dialerNodeWakuConfig, logger.Named("dialerNode")) @@ -363,8 +366,8 @@ func TestDial(t *testing.T) { Discv5Discovery: false, ClusterID: 16, Shards: []uint16{64}, - Discv5UdpPort: 9021, - TcpPort: 60021, + Discv5UdpPort: 9031, + TcpPort: 60031, } receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode")) require.NoError(t, err) @@ -395,3 +398,85 @@ func TestDial(t *testing.T) { require.NoError(t, dialerNode.Stop()) require.NoError(t, receiverNode.Stop()) } + +func TestRelay(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + // start node that will send the message + senderNodeWakuConfig := WakuConfig{ + Relay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9040, + TcpPort: 60040, + } + + senderNode, err := New(&senderNodeWakuConfig, logger.Named("senderNode")) + require.NoError(t, err) + require.NoError(t, senderNode.Start()) + time.Sleep(1 * time.Second) + + // start node that will receive the message + receiverNodeWakuConfig := WakuConfig{ + Relay: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9041, + TcpPort: 60041, + } + receiverNode, err := New(&receiverNodeWakuConfig, logger.Named("receiverNode")) + require.NoError(t, err) + require.NoError(t, receiverNode.Start()) + time.Sleep(1 * time.Second) + receiverMultiaddr, err := receiverNode.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, receiverMultiaddr) + + // Dial so they become peers + err = senderNode.DialPeer(receiverMultiaddr[0]) + require.NoError(t, err) + time.Sleep(1 * time.Second) + // Check that both nodes now have one connected peer + senderPeerCount, err := senderNode.PeerCount() + require.NoError(t, err) + require.True(t, senderPeerCount == 1, "Dialer node should have 1 peer") + receiverPeerCount, err := receiverNode.PeerCount() + require.NoError(t, err) + require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") + + message := &pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5, 6}, + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().Unix()), + } + // send message + pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0]) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + senderNode.RelayPublish(ctx, message, pubsubTopic) + + wg := sync.WaitGroup{} + wg.Add(1) + + // Wait to receive message + select { + case envelope := <-receiverNode.node.MsgChan: + require.NotNil(t, envelope, "Envelope should be received") + require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match") + require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match") + wg.Done() + case <-time.After(10 * time.Second): + t.Fatal("Timeout: No message received within 10 seconds") + } + wg.Wait() + + // Stop nodes + require.NoError(t, senderNode.Stop()) + require.NoError(t, receiverNode.Stop()) +}