From 47b49f966b334dacea1e747f7075f95a5419cccc Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 26 Feb 2025 13:47:34 +0200 Subject: [PATCH 1/7] Adding store files --- waku/nwaku_test_utils.go | 40 ++ waku/store_test.go | 1028 ++++++++++++++++++++++++++++++++++++++ waku/test_data.go | 84 +++- 3 files changed, 1151 insertions(+), 1 deletion(-) create mode 100644 waku/store_test.go diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index 5b1bc96..4d80b1c 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -12,6 +12,7 @@ import ( "time" "github.com/cenkalti/backoff/v3" + "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/waku-go-bindings/waku/common" "google.golang.org/protobuf/proto" @@ -215,3 +216,42 @@ func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { } return nil } + +func (n *WakuNode) GetStoredMessages(storeNode *WakuNode, storeRequest *common.StoreQueryRequest) (*common.StoreQueryResponse, error) { + Debug("Starting store query request") + + if storeRequest == nil { + Debug("Using DefaultStoreQueryRequest") + storeRequest = &DefaultStoreQueryRequest + } + + storeMultiaddr, err := storeNode.ListenAddresses() + if err != nil { + Error("Failed to retrieve listen addresses for store node: %v", err) + return nil, err + } + + if len(storeMultiaddr) == 0 { + Error("Store node has no available listen addresses") + return nil, fmt.Errorf("store node has no available listen addresses") + } + + storeNodeAddrInfo, err := peer.AddrInfoFromString(storeMultiaddr[0].String()) + if err != nil { + Error("Failed to convert store node address to AddrInfo: %v", err) + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + Debug("Querying store node for messages") + res, err := n.StoreQuery(ctx, storeRequest, *storeNodeAddrInfo) + if err != nil { + Error("StoreQuery failed: %v", err) + return nil, err + } + + Debug("Store query successful, retrieved %d messages", len(*res.Messages)) + return res, nil +} diff --git a/waku/store_test.go b/waku/store_test.go new file mode 100644 index 0000000..4e3ce68 --- /dev/null +++ b/waku/store_test.go @@ -0,0 +1,1028 @@ +package waku + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + + "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" +) + +func TestStoreQuery3Nodes(t *testing.T) { + Debug("Starting test to verify store query from a peer using direct peer connections") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + Debug("Publishing message from Node1 using RelayPublish") + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte("test-message"), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + queryTimestamp := proto.Int64(time.Now().UnixNano()) + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + Debug("Waiting for message delivery to Node2") + time.Sleep(2 * time.Second) + + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") + + Debug("Node3 querying stored messages from Node2") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + res, err := node3.GetStoredMessages(node2, storeQueryRequest) + var storedMessages = *res.Messages + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotEmpty(t, storedMessages, "Expected at least one stored message") + Debug("Verifying stored message matches the published message") + require.Equal(t, message.Payload, storedMessages[0].WakuMessage.Payload, "Stored message payload does not match") + Debug("Test successfully verified store query from a peer using direct peer connections") +} + +func TestStoreQueryMultipleMessages(t *testing.T) { + Debug("Starting test to verify store query with multiple messages") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 50 + var sentHashes []common.MessageHash + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + sentHashes = append(sentHashes, msgHash) + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying stored messages from Node2") + res, err := node3.GetStoredMessages(node2, nil) + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotNil(t, res.Messages, "Expected stored messages but received nil") + + storedMessages := *res.Messages + + require.Len(t, storedMessages, numMessages, "Expected to retrieve exactly 50 messages") + + Debug("Verifying stored message hashes match sent message hashes") + var receivedHashes []common.MessageHash + for _, storedMsg := range storedMessages { + receivedHashes = append(receivedHashes, storedMsg.MessageHash) + } + + require.ElementsMatch(t, sentHashes, receivedHashes, "Sent and received message hashes do not match") + + Debug("Test successfully verified store query with multiple messages") +} + +func TestStoreQueryWith5Pagination(t *testing.T) { + Debug("Starting test to verify store query with pagination") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 10 + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + _, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying stored messages from Node2 with PaginationLimit = 5") + storeRequest := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(5)), + PaginationForward: true, + } + + res, err := node3.GetStoredMessages(node2, &storeRequest) + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotNil(t, res.Messages, "Expected stored messages but received nil") + + storedMessages := *res.Messages + + require.Len(t, storedMessages, 5, "Expected to retrieve exactly 5 messages due to pagination limit") + + Debug("Test successfully verified store query with pagination limit") +} + +func TestStoreQueryWithPaginationMultiplePages(t *testing.T) { + Debug("Starting test to verify store query with pagination across multiple pages") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 8 + var sentHashes []common.MessageHash + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + sentHashes = append(sentHashes, msgHash) + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying first page of stored messages from Node2") + storeRequest1 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: true, + } + + res1, err := node3.GetStoredMessages(node2, &storeRequest1) + require.NoError(t, err, "Failed to retrieve first page of stored messages from Node2") + require.NotNil(t, res1.Messages, "Expected stored messages but received nil") + + storedMessages1 := *res1.Messages + require.Len(t, storedMessages1, 5, "Expected to retrieve exactly 5 messages from first query") + + for i := 0; i < 5; i++ { + require.Equal(t, sentHashes[i], storedMessages1[i].MessageHash, "Message order mismatch in first query") + } + + Debug("Node3 querying second page of stored messages from Node2") + storeRequest2 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: true, + PaginationCursor: &res1.PaginationCursor, + } + + res2, err := node3.GetStoredMessages(node2, &storeRequest2) + require.NoError(t, err, "Failed to retrieve second page of stored messages from Node2") + require.NotNil(t, res2.Messages, "Expected stored messages but received nil") + + storedMessages2 := *res2.Messages + require.Len(t, storedMessages2, 3, "Expected to retrieve exactly 3 messages from second query") + + for i := 0; i < 3; i++ { + require.Equal(t, sentHashes[i+5], storedMessages2[i].MessageHash, "Message order mismatch in second query") + } + + Debug("Test successfully verified store query pagination across multiple pages") +} + +func TestStoreQueryWithPaginationReverseOrder(t *testing.T) { + Debug("Starting test to verify store query with pagination in reverse order") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 8 + var sentHashes []common.MessageHash + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + sentHashes = append(sentHashes, msgHash) + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying first page of stored messages from Node2 (Newest first)") + storeRequest1 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: false, + } + + res1, err := node3.GetStoredMessages(node2, &storeRequest1) + require.NoError(t, err, "Failed to retrieve first page of stored messages from Node2") + require.NotNil(t, res1.Messages, "Expected stored messages but received nil") + + storedMessages1 := *res1.Messages + require.Len(t, storedMessages1, 5, "Expected to retrieve exactly 5 messages from first query") + + for i := 0; i < 5; i++ { + require.Equal(t, sentHashes[numMessages-1-i], storedMessages1[i].MessageHash, "Message order mismatch in first query") + } + + Debug("Node3 querying second page of stored messages from Node2") + storeRequest2 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: false, + PaginationCursor: &res1.PaginationCursor, + } + + res2, err := node3.GetStoredMessages(node2, &storeRequest2) + require.NoError(t, err, "Failed to retrieve second page of stored messages from Node2") + require.NotNil(t, res2.Messages, "Expected stored messages but received nil") + + storedMessages2 := *res2.Messages + require.Len(t, storedMessages2, 3, "Expected to retrieve exactly 3 messages from second query") + + for i := 0; i < 3; i++ { + require.Equal(t, sentHashes[numMessages-6-i], storedMessages2[i].MessageHash, "Message order mismatch in second query") + } + + Debug("Test successfully verified store query pagination in reverse order") +} + +func TestQueryFailwhenNoStorePeer(t *testing.T) { + Debug("Starting test to verify store query failure when node2 has no store") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = false + + Debug("Creating Node2 with Relay enabled but Store disabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + + Debug("Creating Node3") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Verifying that Node3 fails to retrieve stored messages since Node2 has store disabled") + storedMessages, err := node3.GetStoredMessages(node2, nil) + require.Error(t, err, "Expected Node3's store query to fail because Node2 has store disabled") + require.Empty(t, storedMessages, "Expected no messages in store for Node3") + + Debug("Test successfully verified that store query fails when Node2 does not store messages") +} + +func TestQueryFailWithIncorrectStaticNode(t *testing.T) { + Debug("Starting test to verify store query failure when Node3 has an incorrect static node address") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node1Address, err := node1.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + node2Config.Staticnodes = []string{node1Address[0].String()} + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node2Address, err := node2.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node2") + + var incorrectAddress = node2Address[0].String()[:len(node2Address[0].String())-10] + node3Config := DefaultWakuConfig + node3Config.Relay = true + node3Config.Staticnodes = []string{incorrectAddress} + + Debug("Original Node2 Address: %s", node2Address[0].String()) + Debug("Modified Node2 Address: %s", incorrectAddress) + + Debug("Creating Node3 with an incorrect static node address") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Sender Node1 is publishing a message") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Verifying that Node3 fails to retrieve stored messages due to incorrect static node") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + storedmsgs, err := node3.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Expected Node3's store query to fail due to incorrect static node") + require.Nil(t, (*storedmsgs.Messages)[0].WakuMessage, "Expected no messages in store for Node3") + + Debug("Test successfully verified store query failure due to incorrect static node configuration") +} + +func TestStoreQueryWithoutData(t *testing.T) { + Debug("Starting test to verify store query returns only message hashes when IncludeData is false") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true // Enable store on Node2 + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + + Debug("Creating Node3") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node3 with IncludeData = false") + storeQueryRequest := &common.StoreQueryRequest{ + IncludeData: false, + } + + storedmsgs, err := node3.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.NotNil(t, storedmsgs.Messages, "Expected store response to contain message hashes") + //require.Len(t, *storedmsgs.Messages, 1, "Expected exactly one stored message") + + // Access the first message + // firstMessage := (*storedmsgs.Messages)[0] + + firstMessage := (*storedmsgs.Messages)[0] + require.Nil(t, firstMessage.WakuMessage, "Expected message payload to be empty when IncludeData is false") + require.NotEmpty(t, (*storedmsgs.Messages)[0].MessageHash, "Expected message hash to be present") + Debug("Queried message hash: %s", (*storedmsgs.Messages)[0].MessageHash) + + Debug("Test successfully verified that store query returns only message hashes when IncludeData is false") +} + +func TestStoreQueryWithWrongContentTopic(t *testing.T) { + Debug("Starting test to verify store query fails when using an incorrect content topic and an old timestamp") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + + Debug("Creating Node3") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Recording timestamp before message publication") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing a message with a correct content topic") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node3 with an incorrect content topic and an old timestamp") + storeQueryRequest := &common.StoreQueryRequest{ + ContentTopics: &[]string{"incorrect-content-topic"}, + TimeStart: queryTimestamp, + } + + storedmsgs, _ := node3.GetStoredMessages(node2, storeQueryRequest) + //require.Error(t, err, "Expected error when querying with an incorrect content topic and old timestamp") + require.Nil(t, (*storedmsgs.Messages)[0], "Expected no messages to be returned for incorrect content topic and timestamp") + //Debug("Queried message hash: %s", (*storedmsgs.Messages)[0].MessageHash) + Debug("Test successfully verified that store query fails when using an incorrect content topic and an old timestamp") +} + +func TestCheckStoredMSGsEphemeralTrue(t *testing.T) { + Debug("Starting test to verify ephemeral messages are not stored") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Recording timestamp before message publication") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing an ephemeral message") + message := node1.CreateMessage() + ephemeralTrue := true + message.Ephemeral = &ephemeralTrue + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node2") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Equal(t, 0, len(*storedmsgs.Messages), "Expected no stored messages for ephemeral messages") + + Debug("Test successfully verified that ephemeral messages are not stored") +} + +func TestCheckStoredMSGsEphemeralFalse(t *testing.T) { + Debug("Starting test to verify non-ephemeral messages are stored") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Recording timestamp before message publication") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing a non-ephemeral message") + message := node1.CreateMessage() + ephemeralFalse := false + message.Ephemeral = &ephemeralFalse + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node2") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Equal(t, 1, len(*storedmsgs.Messages), "Expected exactly one stored message") + + Debug("Test finished successfully ") +} + +func TestCheckLegacyStore(t *testing.T) { + Debug("Starting test ") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 ") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + node2Config.LegacyStore = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Equal(t, 1, len(*storedmsgs.Messages), "Expected exactly one stored message") + + Debug("Test finished successfully ") + +} + +func TestStoredMessagesWithVDifferentPayloads(t *testing.T) { + Debug("Starting test ") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 ") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + for _, pLoad := range SAMPLE_INPUTS { + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing message with payload: %s", pLoad.Value) + message := node1.CreateMessage() + message.Payload = []byte(pLoad.Value) + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + retrievedMessage := (*storedmsgs.Messages)[0] + require.Equal(t, pLoad.Value, string(retrievedMessage.WakuMessage.Payload), "Expected WakuMessage but got nil") + Debug("Payload matches expected %s", string(retrievedMessage.WakuMessage.Payload)) + } + + Debug("Test finished successfully ") +} + +func TestStoredMessagesWithDifferentContentTopics(t *testing.T) { + Debug("Starting test for different content topics") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + for _, contentTopic := range CONTENT_TOPICS_DIFFERENT_SHARDS { + + Debug("Node1 is publishing message with content topic: %s", contentTopic) + queryTimestamp := proto.Int64(time.Now().UnixNano()) + message := node1.CreateMessage() + message.ContentTopic = contentTopic + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + ContentTopics: &[]string{contentTopic}, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message") + require.Equal(t, contentTopic, (*storedmsgs.Messages)[0].WakuMessage.ContentTopic, "Stored message content topic does not match expected") + Debug("Veified content topic %s ", (*storedmsgs.Messages)[0].WakuMessage.ContentTopic) + } + + Debug("Test finished successfully") +} + +func TestStoredMessagesWithDifferentPubsubTopics(t *testing.T) { + Debug("Starting test for different pubsub topics") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + for _, pubsubTopic := range PUBSUB_TOPICS_STORE { + + Debug("Node1 is publishing message on pubsub topic: %s", pubsubTopic) + node1.RelaySubscribe(pubsubTopic) + node2.RelaySubscribe(pubsubTopic) + queryTimestamp := proto.Int64(time.Now().UnixNano()) + var msg = node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(pubsubTopic, msg) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + PubsubTopic: pubsubTopic, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message") + require.Equal(t, pubsubTopic, (*storedmsgs.Messages)[0].PubsubTopic, "Stored message pubsub topic does not match expected") + } + + Debug("Test finished successfully") +} diff --git a/waku/test_data.go b/waku/test_data.go index 67125d1..e8c1a60 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -1,16 +1,20 @@ package waku import ( + "fmt" "time" "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" ) var DefaultWakuConfig common.WakuConfig +var DefaultStoreQueryRequest common.StoreQueryRequest +var DEFAULT_CLUSTER_ID = 16 func init() { - DefaultWakuConfig = WakuConfig{ + DefaultWakuConfig = common.WakuConfig{ Relay: false, LogLevel: "DEBUG", Discv5Discovery: true, @@ -23,6 +27,14 @@ func init() { Discv5UdpPort: 0, TcpPort: 0, } + + DefaultStoreQueryRequest = common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(50)), + PaginationForward: true, + TimeStart: proto.Int64(time.Now().Add(-5 * time.Minute).UnixNano()), // 5 mins before now + } } const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connect to another node @@ -33,3 +45,73 @@ var ( MinPort = 1024 // Minimum allowable port (exported) MaxPort = 65535 // Maximum allowable port (exported) ) + +var SAMPLE_INPUTS = []struct { + Description string + Value string +}{ + {"A simple string", "Hello World!"}, + {"An integer", "1234567890"}, + {"A dictionary", `{"key": "value"}`}, + {"Chinese characters", "这是一些中文"}, + {"Emojis", "🚀🌟✨"}, + {"Lorem ipsum text", "Lorem ipsum dolor sit amet"}, + {"HTML content", "Hello"}, + {"Cyrillic characters", "\u041f\u0440\u0438\u0432\u0435\u0442"}, + {"Base64 encoded string", "Base64==dGVzdA=="}, + {"Binary data", "d29ya2luZyB3aXRoIGJpbmFyeSBkYXRh: \x50\x51"}, + {"Special characters with whitespace", "\t\nSpecial\tCharacters\n"}, + {"Boolean false as a string", "False"}, + {"A float number", "3.1415926535"}, + {"A list", "[1, 2, 3, 4, 5]"}, + {"Hexadecimal number as a string", "0xDEADBEEF"}, + {"Email format", "user@example.com"}, + {"URL format", "http://example.com"}, + {"Date and time in ISO format", "2023-11-01T12:00:00Z"}, + {"String with escaped quotes", `"Escaped" \"quotes\"`}, + {"A regular expression", "Regular expression: ^[a-z0-9_-]{3,16}$"}, + {"A very long string", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}, + {"A JSON string", `{"name": "John", "age": 30, "city": "New York"}`}, + {"A Unix path", "/usr/local/bin"}, + {"A Windows path", "C:\\Windows\\System32"}, + {"An SQL query", "SELECT * FROM users WHERE id = 1;"}, + {"JavaScript code snippet", "function test() { console.log('Hello World'); }"}, + {"A CSS snippet", "body { background-color: #fff; }"}, + {"A Python one-liner", "print('Hello World')"}, + {"An IP address", "192.168.1.1"}, + {"A domain name", "www.example.com"}, + {"A user agent string", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}, + {"A credit card number", "1234-5678-9012-3456"}, + {"A phone number", "+1234567890"}, + {"A UUID", "123e4567-e89b-12d3-a456-426614174000"}, + {"A hashtag", "#helloWorld"}, + {"A Twitter handle", "@username"}, + {"A password", "P@ssw0rd!"}, + {"A date in common format", "01/11/2023"}, + {"A time string", "12:00:00"}, + {"A mathematical equation", "E = mc^2"}, +} + +var PUBSUB_TOPICS_STORE = []string{ + + fmt.Sprintf("/waku/2/rs/%d/0", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/1", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/2", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/3", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/4", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/5", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/6", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/7", DEFAULT_CLUSTER_ID), + //fmt.Sprintf("/waku/2/rs/%d/8", DEFAULT_CLUSTER_ID), +} + +var CONTENT_TOPICS_DIFFERENT_SHARDS = []string{ + "/myapp/1/latest/proto", + "/waku/2/content/test.js", + "/app/22/sometopic/someencoding", + "/toychat/2/huilong/proto", + "/statusim/1/community/cbor", + "/app/27/sometopic/someencoding", + "/app/29/sometopic/someencoding", + "/app/20/sometopic/someencoding", +} From 48c83a3a03df92b5a67e79cd970672734ae81bce Mon Sep 17 00:00:00 2001 From: aya Date: Wed, 26 Feb 2025 14:01:44 +0200 Subject: [PATCH 2/7] commenting shard ID --- waku/test_data.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/waku/test_data.go b/waku/test_data.go index e8c1a60..7552e23 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -19,13 +19,13 @@ func init() { LogLevel: "DEBUG", Discv5Discovery: true, ClusterID: 16, - Shards: []uint16{64}, - PeerExchange: false, - Store: false, - Filter: false, - Lightpush: false, - Discv5UdpPort: 0, - TcpPort: 0, + //Shards: []uint16{64}, + PeerExchange: false, + Store: false, + Filter: false, + Lightpush: false, + Discv5UdpPort: 0, + TcpPort: 0, } DefaultStoreQueryRequest = common.StoreQueryRequest{ @@ -102,7 +102,7 @@ var PUBSUB_TOPICS_STORE = []string{ fmt.Sprintf("/waku/2/rs/%d/5", DEFAULT_CLUSTER_ID), fmt.Sprintf("/waku/2/rs/%d/6", DEFAULT_CLUSTER_ID), fmt.Sprintf("/waku/2/rs/%d/7", DEFAULT_CLUSTER_ID), - //fmt.Sprintf("/waku/2/rs/%d/8", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/8", DEFAULT_CLUSTER_ID), } var CONTENT_TOPICS_DIFFERENT_SHARDS = []string{ From bab227d656d77a6a7f733609fb4e26c1f871a5b4 Mon Sep 17 00:00:00 2001 From: aya Date: Sun, 2 Mar 2025 11:59:35 +0200 Subject: [PATCH 3/7] ignore temp files --- .gitignore | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.gitignore b/.gitignore index aed6ae9..9eba29d 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,9 @@ go.work # Generated dependencies and cache third_party nimcache + +waku/store.sqlite3 + +waku/store.sqlite3-shm + +waku/store.sqlite3-wal From 59a2e543913859b0cb03ce4b44ea48d48ceb1c88 Mon Sep 17 00:00:00 2001 From: aya Date: Sun, 2 Mar 2025 16:44:30 +0200 Subject: [PATCH 4/7] Add store tests --- waku/nwaku_test_utils.go | 2 +- waku/store_test.go | 260 +++++++++++++++++++++++++++++++++++++++ waku/test_data.go | 15 +-- 3 files changed, 269 insertions(+), 8 deletions(-) diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index 4d80b1c..7888934 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -95,7 +95,7 @@ func (n *WakuNode) CreateMessage(customMessage ...*pb.WakuMessage) *pb.WakuMessa Debug("Using default message format on node %s", n.nodeName) defaultMessage := &pb.WakuMessage{ Payload: []byte("This is a default Waku message payload"), - ContentTopic: "test-content-topic", + ContentTopic: DefaultContentTopic, Version: proto.Uint32(0), Timestamp: proto.Int64(time.Now().UnixNano()), } diff --git a/waku/store_test.go b/waku/store_test.go index 4e3ce68..ce59e49 100644 --- a/waku/store_test.go +++ b/waku/store_test.go @@ -1026,3 +1026,263 @@ func TestStoredMessagesWithDifferentPubsubTopics(t *testing.T) { Debug("Test finished successfully") } + +func TestStoredMessagesWithMetaField(t *testing.T) { + Debug("Starting test ") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Node1 is publishing a message with meta field set") + message := node1.CreateMessage() + message.Payload = []byte("payload") + message.Meta = []byte([]byte("hello")) + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message") + + retrievedMessage := (*storedmsgs.Messages)[0].WakuMessage + require.Equal(t, string(message.Payload), string(retrievedMessage.Payload), "Payload does not match") + require.Equal(t, string(message.Meta), string(retrievedMessage.Meta), "Meta field does not match expected Base64-encoded payload") + + Debug("Test finished successfully ") +} + +func TestStoredMessagesWithVersionField(t *testing.T) { + Debug("Starting test") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + version := uint32(2) + + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Node1 is publishing a message with version field set") + message := node1.CreateMessage() + message.Version = &version + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message") + + retrievedMessage := (*storedmsgs.Messages)[0].WakuMessage + require.Equal(t, version, *retrievedMessage.Version, "Version field does not match expected value") + + Debug("Test finished successfully ") +} + +func TestStoredDublicateMessage(t *testing.T) { + Debug("Starting test") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Node1 is publishing two identical messages") + _, err = node1.RelayPublishNoCTX(DefaultPubsubTopic, node1.CreateMessage()) + require.NoError(t, err, "Failed to publish first message") + + _, err = node1.RelayPublishNoCTX(DefaultPubsubTopic, node1.CreateMessage()) + require.NoError(t, err, "Failed to publish second message") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + + require.Equal(t, 1, len(*storedmsgs.Messages), "Expected only one stored message since identical messages should be deduplicated") + + Debug("Test finished successfully") +} + +func TestQueryStoredMessagesWithoutPublishing(t *testing.T) { + Debug("Starting test: Querying stored messages without publishing any") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Querying stored messages from Node2 without publishing any message") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Empty(t, *storedmsgs.Messages, "Expected no stored messages") + + Debug("Test finished successfully") +} + +func TestQueryStoredMessagesWithWrongHash(t *testing.T) { + Debug("Starting test: Querying stored messages with a slightly modified message hash") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Node1 is publishing a message") + message := node1.CreateMessage() + message.Payload = []byte("Test message for hash modification") + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("MOdify the original message hash: %s", msgHash) + modifiedHash := common.MessageHash(msgHash[:len(msgHash)-2] + "da") + + Debug("Querying stored messages from Node2 using a modified hash: %s", modifiedHash) + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + MessageHashes: &[]common.MessageHash{modifiedHash}, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2 with modified hash") + require.Empty(t, *storedmsgs.Messages, "Expected no stored messages with a modified hash") + + Debug("Test finished successfully: Query with a modified hash returned 0 messages as expected") +} diff --git a/waku/test_data.go b/waku/test_data.go index 7552e23..6f62400 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -19,13 +19,13 @@ func init() { LogLevel: "DEBUG", Discv5Discovery: true, ClusterID: 16, - //Shards: []uint16{64}, - PeerExchange: false, - Store: false, - Filter: false, - Lightpush: false, - Discv5UdpPort: 0, - TcpPort: 0, + Shards: []uint16{64}, + PeerExchange: false, + Store: false, + Filter: false, + Lightpush: false, + Discv5UdpPort: 0, + TcpPort: 0, } DefaultStoreQueryRequest = common.StoreQueryRequest{ @@ -41,6 +41,7 @@ const ConnectPeerTimeout = 10 * time.Second //default timeout for node to connec const DefaultTimeOut = 3 * time.Second var DefaultPubsubTopic = "/waku/2/rs/16/64" +var DefaultContentTopic = "/test/1/default/proto" var ( MinPort = 1024 // Minimum allowable port (exported) MaxPort = 65535 // Maximum allowable port (exported) From 5bde8e1b3791ce9b848bc1ab5824cf321fd03208 Mon Sep 17 00:00:00 2001 From: aya Date: Mon, 3 Mar 2025 18:04:10 +0200 Subject: [PATCH 5/7] Change duplicate test --- waku/store_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/waku/store_test.go b/waku/store_test.go index ce59e49..62674d7 100644 --- a/waku/store_test.go +++ b/waku/store_test.go @@ -1167,12 +1167,12 @@ func TestStoredDublicateMessage(t *testing.T) { require.NoError(t, err, "Failed to connect Node2 to Node1") queryTimestamp := proto.Int64(time.Now().UnixNano()) - + var msg = node1.CreateMessage() Debug("Node1 is publishing two identical messages") - _, err = node1.RelayPublishNoCTX(DefaultPubsubTopic, node1.CreateMessage()) + _, err = node1.RelayPublishNoCTX(DefaultPubsubTopic, msg) require.NoError(t, err, "Failed to publish first message") - _, err = node1.RelayPublishNoCTX(DefaultPubsubTopic, node1.CreateMessage()) + _, err = node2.RelayPublishNoCTX(DefaultPubsubTopic, msg) require.NoError(t, err, "Failed to publish second message") Debug("Querying stored messages from Node2 using Node1") From c155e640784b4aa7b04202a189312b24d5f90726 Mon Sep 17 00:00:00 2001 From: AYAHASSAN287 <49167455+AYAHASSAN287@users.noreply.github.com> Date: Thu, 6 Mar 2025 15:03:19 +0200 Subject: [PATCH 6/7] Update waku/nwaku_test_utils.go Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> --- waku/nwaku_test_utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index 7888934..7e319d7 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -233,7 +233,7 @@ func (n *WakuNode) GetStoredMessages(storeNode *WakuNode, storeRequest *common.S if len(storeMultiaddr) == 0 { Error("Store node has no available listen addresses") - return nil, fmt.Errorf("store node has no available listen addresses") + return nil, errors.New("store node has no available listen addresses") } storeNodeAddrInfo, err := peer.AddrInfoFromString(storeMultiaddr[0].String()) From 2e8587deb339a69963e81cebac4c83903a0d2334 Mon Sep 17 00:00:00 2001 From: aya Date: Sun, 9 Mar 2025 12:13:01 +0200 Subject: [PATCH 7/7] Fix review comments --- waku/store_test.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/waku/store_test.go b/waku/store_test.go index 62674d7..b644fe1 100644 --- a/waku/store_test.go +++ b/waku/store_test.go @@ -148,7 +148,7 @@ func TestStoreQueryMultipleMessages(t *testing.T) { } Debug("Waiting for message delivery to Node2") - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) Debug("Node3 querying stored messages from Node2") res, err := node3.GetStoredMessages(node2, nil) @@ -230,7 +230,7 @@ func TestStoreQueryWith5Pagination(t *testing.T) { } Debug("Waiting for message delivery to Node2") - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) Debug("Node3 querying stored messages from Node2 with PaginationLimit = 5") storeRequest := common.StoreQueryRequest{ @@ -313,7 +313,7 @@ func TestStoreQueryWithPaginationMultiplePages(t *testing.T) { } Debug("Waiting for message delivery to Node2") - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) Debug("Node3 querying first page of stored messages from Node2") storeRequest1 := common.StoreQueryRequest{ @@ -419,7 +419,7 @@ func TestStoreQueryWithPaginationReverseOrder(t *testing.T) { } Debug("Waiting for message delivery to Node2") - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) Debug("Node3 querying first page of stored messages from Node2 (Newest first)") storeRequest1 := common.StoreQueryRequest{ @@ -463,7 +463,7 @@ func TestStoreQueryWithPaginationReverseOrder(t *testing.T) { Debug("Test successfully verified store query pagination in reverse order") } -func TestQueryFailwhenNoStorePeer(t *testing.T) { +func TestQueryFailWhenNoStorePeer(t *testing.T) { Debug("Starting test to verify store query failure when node2 has no store") node1Config := DefaultWakuConfig @@ -633,10 +633,6 @@ func TestStoreQueryWithoutData(t *testing.T) { storedmsgs, err := node3.GetStoredMessages(node2, storeQueryRequest) require.NoError(t, err, "Failed to query store messages from Node2") require.NotNil(t, storedmsgs.Messages, "Expected store response to contain message hashes") - //require.Len(t, *storedmsgs.Messages, 1, "Expected exactly one stored message") - - // Access the first message - // firstMessage := (*storedmsgs.Messages)[0] firstMessage := (*storedmsgs.Messages)[0] require.Nil(t, firstMessage.WakuMessage, "Expected message payload to be empty when IncludeData is false") @@ -702,9 +698,7 @@ func TestStoreQueryWithWrongContentTopic(t *testing.T) { } storedmsgs, _ := node3.GetStoredMessages(node2, storeQueryRequest) - //require.Error(t, err, "Expected error when querying with an incorrect content topic and old timestamp") require.Nil(t, (*storedmsgs.Messages)[0], "Expected no messages to be returned for incorrect content topic and timestamp") - //Debug("Queried message hash: %s", (*storedmsgs.Messages)[0].MessageHash) Debug("Test successfully verified that store query fails when using an incorrect content topic and an old timestamp") } @@ -1138,7 +1132,7 @@ func TestStoredMessagesWithVersionField(t *testing.T) { Debug("Test finished successfully ") } -func TestStoredDublicateMessage(t *testing.T) { +func TestStoredDuplicateMessage(t *testing.T) { Debug("Starting test") node1Config := DefaultWakuConfig