mirror of
https://github.com/logos-messaging/logos-messaging-go-bindings.git
synced 2026-01-07 08:23:06 +00:00
improving test
This commit is contained in:
parent
7cc330f147
commit
50f18541dd
@ -3,7 +3,7 @@ package common
|
||||
type StoreQueryRequest struct {
|
||||
RequestId string `json:"request_id"`
|
||||
IncludeData bool `json:"include_data"`
|
||||
PubsubTopic *string `json:"pubsub_topic,omitempty"`
|
||||
PubsubTopic string `json:"pubsub_topic,omitempty"`
|
||||
ContentTopics []string `json:"content_topics,omitempty"`
|
||||
TimeStart *int64 `json:"time_start,omitempty"`
|
||||
TimeEnd *int64 `json:"time_end,omitempty"`
|
||||
|
||||
@ -694,49 +694,108 @@ func TestHash(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
|
||||
|
||||
message := &pb.WakuMessage{
|
||||
Payload: []byte{1, 2, 3, 4, 5, 6},
|
||||
ContentTopic: "test-content-topic",
|
||||
Version: proto.Uint32(0),
|
||||
Timestamp: proto.Int64(time.Now().UnixNano()),
|
||||
}
|
||||
// send message
|
||||
// Send 8 messages
|
||||
numMessages := 8
|
||||
paginationLimit := 5
|
||||
timeStart := proto.Int64(time.Now().UnixNano())
|
||||
hashes := []common.MessageHash{}
|
||||
pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0])
|
||||
ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel2()
|
||||
|
||||
fmt.Println("----------- publishing message ------------")
|
||||
hash, err := senderNode.RelayPublish(ctx2, message, pubsubTopic)
|
||||
require.NoError(t, err)
|
||||
fmt.Println("----------- RelayPublish returned hash: ", hash)
|
||||
for i := 0; i < numMessages; i++ {
|
||||
message := &pb.WakuMessage{
|
||||
Payload: []byte{byte(i)}, // Include message number in payload
|
||||
ContentTopic: "test-content-topic",
|
||||
Version: proto.Uint32(0),
|
||||
Timestamp: proto.Int64(time.Now().UnixNano()),
|
||||
}
|
||||
|
||||
// Wait to receive message
|
||||
select {
|
||||
case envelope := <-receiverNode.MsgChan:
|
||||
fmt.Println("------- received envelope: ", envelope)
|
||||
require.NotNil(t, envelope, "Envelope should be received")
|
||||
require.Equal(t, message.Payload, envelope.Message().Payload, "Received payload should match")
|
||||
require.Equal(t, message.ContentTopic, envelope.Message().ContentTopic, "Content topic should match")
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout: No message received within 10 seconds")
|
||||
ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel2()
|
||||
|
||||
fmt.Printf("----------- publishing message %d ------------\n", i)
|
||||
hash, err := senderNode.RelayPublish(ctx2, message, pubsubTopic)
|
||||
require.NoError(t, err)
|
||||
hashes = append(hashes, hash)
|
||||
fmt.Printf("----------- RelayPublish %d returned hash: %s\n", i, hash)
|
||||
}
|
||||
fmt.Println("-------- sent hashes: ", hashes)
|
||||
|
||||
// Wait to receive all 8 messages
|
||||
receivedCount := 0
|
||||
receivedMessages := make(map[byte]bool)
|
||||
|
||||
// Use a timeout for the entire receive operation
|
||||
timeoutChan := time.After(10 * time.Second)
|
||||
|
||||
for receivedCount < numMessages {
|
||||
select {
|
||||
case envelope := <-receiverNode.MsgChan:
|
||||
fmt.Printf("------- received envelope: %v\n", envelope)
|
||||
require.NotNil(t, envelope, "Envelope should be received")
|
||||
|
||||
payload := envelope.Message().Payload
|
||||
msgNum := payload[0]
|
||||
|
||||
// Check if we've already received this message number
|
||||
if !receivedMessages[msgNum] {
|
||||
receivedMessages[msgNum] = true
|
||||
receivedCount++
|
||||
fmt.Printf("Received message %d (total: %d)\n", msgNum, receivedCount)
|
||||
}
|
||||
|
||||
require.Equal(t, "test-content-topic", envelope.Message().ContentTopic, "Content topic should match")
|
||||
|
||||
case <-timeoutChan:
|
||||
t.Fatalf("Timeout: Only received %d messages out of 8 within 10 seconds", receivedCount)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify we received all messages
|
||||
for i := 0; i < numMessages; i++ {
|
||||
require.True(t, receivedMessages[byte(i)], fmt.Sprintf("Message %d was not received", i))
|
||||
}
|
||||
|
||||
// Now send store query
|
||||
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel3()
|
||||
|
||||
storeReq := common.StoreQueryRequest{
|
||||
IncludeData: true,
|
||||
ContentTopics: []string{"test-content-topic"},
|
||||
storeReq1 := common.StoreQueryRequest{
|
||||
IncludeData: true,
|
||||
ContentTopics: []string{"test-content-topic"},
|
||||
PaginationLimit: proto.Uint64(uint64(paginationLimit)),
|
||||
PaginationForward: true,
|
||||
TimeStart: timeStart,
|
||||
}
|
||||
|
||||
fmt.Println("------------ storeNode multiaddr: ", receiverMultiaddr[0].String())
|
||||
storeNodeAddrInfo, err := peer.AddrInfoFromString(receiverMultiaddr[0].String())
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := senderNode.StoreQuery(ctx3, &storeReq, *storeNodeAddrInfo)
|
||||
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel3()
|
||||
|
||||
res1, err := senderNode.StoreQuery(ctx3, &storeReq1, *storeNodeAddrInfo)
|
||||
require.NoError(t, err)
|
||||
fmt.Printf("%+v\n", res)
|
||||
fmt.Printf("%+v\n", res1)
|
||||
|
||||
storedMessages := res1.Messages
|
||||
for i := 0; i < paginationLimit; i++ {
|
||||
require.True(t, storedMessages[i].MessageHash == hashes[i], fmt.Sprintf("Stored message does not match received message for index %d", i))
|
||||
}
|
||||
|
||||
// Now let's query the second page
|
||||
storeReq2 := common.StoreQueryRequest{
|
||||
IncludeData: true,
|
||||
ContentTopics: []string{"test-content-topic"},
|
||||
PaginationLimit: proto.Uint64(uint64(paginationLimit)),
|
||||
PaginationForward: true,
|
||||
TimeStart: timeStart,
|
||||
PaginationCursor: res1.PaginationCursor,
|
||||
}
|
||||
|
||||
ctx4, cancel4 := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel4()
|
||||
|
||||
res2, err := senderNode.StoreQuery(ctx4, &storeReq2, *storeNodeAddrInfo)
|
||||
require.NoError(t, err)
|
||||
fmt.Printf("%+v\n", res2)
|
||||
|
||||
// Stop nodes
|
||||
require.NoError(t, senderNode.Stop())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user