diff --git a/waku/common/store.go b/waku/common/store.go index 79fb443..0a44c54 100644 --- a/waku/common/store.go +++ b/waku/common/store.go @@ -3,7 +3,7 @@ package common type StoreQueryRequest struct { RequestId string `json:"request_id"` IncludeData bool `json:"include_data"` - PubsubTopic *string `json:"pubsub_topic,omitempty"` + PubsubTopic string `json:"pubsub_topic,omitempty"` ContentTopics []string `json:"content_topics,omitempty"` TimeStart *int64 `json:"time_start,omitempty"` TimeEnd *int64 `json:"time_end,omitempty"` diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 98da03c..a010cb1 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -694,49 +694,108 @@ func TestHash(t *testing.T) { require.NoError(t, err) require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") - message := &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5, 6}, - ContentTopic: "test-content-topic", - Version: proto.Uint32(0), - Timestamp: proto.Int64(time.Now().UnixNano()), - } - // send message + // Send 8 messages + numMessages := 8 + paginationLimit := 5 + timeStart := proto.Int64(time.Now().UnixNano()) + hashes := []common.MessageHash{} pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0]) - ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout) - defer cancel2() - fmt.Println("----------- publishing message ------------") - hash, err := senderNode.RelayPublish(ctx2, message, pubsubTopic) - require.NoError(t, err) - fmt.Println("----------- RelayPublish returned hash: ", hash) + for i := 0; i < numMessages; i++ { + message := &pb.WakuMessage{ + Payload: []byte{byte(i)}, // Include message number in payload + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), + } - // Wait to receive message - select { - case envelope := <-receiverNode.MsgChan: - fmt.Println("------- received envelope: ", envelope) - require.NotNil(t, envelope, "Envelope should be received") - require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match") - require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match") - case <-time.After(10 * time.Second): - t.Fatal("Timeout: No message received within 10 seconds") + ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel2() + + fmt.Printf("----------- publishing message %d ------------\n", i) + hash, err := senderNode.RelayPublish(ctx2, message, pubsubTopic) + require.NoError(t, err) + hashes = append(hashes, hash) + fmt.Printf("----------- RelayPublish %d returned hash: %s\n", i, hash) + } + fmt.Println("-------- sent hashes: ", hashes) + + // Wait to receive all 8 messages + receivedCount := 0 + receivedMessages := make(map[byte]bool) + + // Use a timeout for the entire receive operation + timeoutChan := time.After(10 * time.Second) + + for receivedCount < numMessages { + select { + case envelope := <-receiverNode.MsgChan: + fmt.Printf("------- received envelope: %v\n", envelope) + require.NotNil(t, envelope, "Envelope should be received") + + payload := envelope.Message().Payload + msgNum := payload[0] + + // Check if we've already received this message number + if !receivedMessages[msgNum] { + receivedMessages[msgNum] = true + receivedCount++ + fmt.Printf("Received message %d (total: %d)\n", msgNum, receivedCount) + } + + require.Equal(t, "test-content-topic", envelope.Message().ContentTopic, "Content topic should match") + + case <-timeoutChan: + t.Fatalf("Timeout: Only received %d messages out of 8 within 10 seconds", receivedCount) + } + } + + // Verify we received all messages + for i := 0; i < numMessages; i++ { + require.True(t, receivedMessages[byte(i)], fmt.Sprintf("Message %d was not received", i)) } // Now send store query - ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) - defer cancel3() - - storeReq := common.StoreQueryRequest{ - IncludeData: true, - ContentTopics: []string{"test-content-topic"}, + storeReq1 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: []string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(paginationLimit)), + PaginationForward: true, + TimeStart: timeStart, } fmt.Println("------------ storeNode multiaddr: ", receiverMultiaddr[0].String()) storeNodeAddrInfo, err := peer.AddrInfoFromString(receiverMultiaddr[0].String()) require.NoError(t, err) - res, err := senderNode.StoreQuery(ctx3, &storeReq, *storeNodeAddrInfo) + ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel3() + + res1, err := senderNode.StoreQuery(ctx3, &storeReq1, *storeNodeAddrInfo) require.NoError(t, err) - fmt.Printf("%+v\n", res) + fmt.Printf("%+v\n", res1) + + storedMessages := res1.Messages + for i := 0; i < paginationLimit; i++ { + require.True(t, storedMessages[i].MessageHash == hashes[i], fmt.Sprintf("Stored message does not match received message for index %d", i)) + } + + // Now let's query the second page + storeReq2 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: []string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(paginationLimit)), + PaginationForward: true, + TimeStart: timeStart, + PaginationCursor: res1.PaginationCursor, + } + + ctx4, cancel4 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel4() + + res2, err := senderNode.StoreQuery(ctx4, &storeReq2, *storeNodeAddrInfo) + require.NoError(t, err) + fmt.Printf("%+v\n", res2) // Stop nodes require.NoError(t, senderNode.Stop())