chore: using msg hash in hex and adding store test (#16)

This commit is contained in:
gabrielmer 2025-01-30 10:15:38 +02:00 committed by GitHub
parent 9c78421cd0
commit a4db2843d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 263 additions and 20 deletions

View File

@ -3,7 +3,6 @@ package common
import (
"encoding/json"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
@ -13,13 +12,13 @@ import (
type Envelope interface {
Message() *pb.WakuMessage
PubsubTopic() string
Hash() pb.MessageHash
Hash() MessageHash
}
type envelopeImpl struct {
msg *pb.WakuMessage
topic string
hash pb.MessageHash
hash MessageHash
}
type tmpWakuMessageJson struct {
@ -35,7 +34,7 @@ type tmpWakuMessageJson struct {
type tmpEnvelopeStruct struct {
WakuMessage tmpWakuMessageJson `json:"wakuMessage"`
PubsubTopic string `json:"pubsubTopic"`
MessageHash string `json:"messageHash"`
MessageHash MessageHash `json:"messageHash"`
}
// NewEnvelope creates a new Envelope from a json string generated in nwaku
@ -46,7 +45,6 @@ func NewEnvelope(jsonEventStr string) (Envelope, error) {
return nil, err
}
hash, err := hexutil.Decode(tmpEnvelopeStruct.MessageHash)
if err != nil {
return nil, err
}
@ -62,7 +60,7 @@ func NewEnvelope(jsonEventStr string) (Envelope, error) {
RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof,
},
topic: tmpEnvelopeStruct.PubsubTopic,
hash: pb.ToMessageHash(hash),
hash: tmpEnvelopeStruct.MessageHash,
}, nil
}
@ -74,6 +72,6 @@ func (e *envelopeImpl) PubsubTopic() string {
return e.topic
}
func (e *envelopeImpl) Hash() pb.MessageHash {
func (e *envelopeImpl) Hash() MessageHash {
return e.hash
}

View File

@ -0,0 +1,39 @@
package common
import (
"encoding/hex"
"errors"
"fmt"
)
// MessageHash represents an unique identifier for a message within a pubsub topic
type MessageHash string
func ToMessageHash(val string) (MessageHash, error) {
if len(val) == 0 {
return "", errors.New("empty string not allowed")
}
if len(val) < 2 || val[:2] != "0x" {
return "", errors.New("string must start with 0x")
}
// Remove "0x" prefix for hex decoding
hexStr := val[2:]
// Verify the remaining string is valid hex
_, err := hex.DecodeString(hexStr)
if err != nil {
return "", fmt.Errorf("invalid hex string: %v", err)
}
return MessageHash(val), nil
}
func (h MessageHash) String() string {
return string(h)
}
func (h MessageHash) Bytes() ([]byte, error) {
return hex.DecodeString(string(h))
}

28
waku/common/store.go Normal file
View File

@ -0,0 +1,28 @@
package common
type StoreQueryRequest struct {
RequestId string `json:"request_id"`
IncludeData bool `json:"include_data"`
PubsubTopic string `json:"pubsub_topic,omitempty"`
ContentTopics *[]string `json:"content_topics,omitempty"`
TimeStart *int64 `json:"time_start,omitempty"`
TimeEnd *int64 `json:"time_end,omitempty"`
MessageHashes *[]MessageHash `json:"message_hashes,omitempty"`
PaginationCursor *MessageHash `json:"pagination_cursor,omitempty"`
PaginationForward bool `json:"pagination_forward"`
PaginationLimit *uint64 `json:"pagination_limit,omitempty"`
}
type StoreMessageResponse struct {
WakuMessage *tmpWakuMessageJson `json:"message"`
PubsubTopic string `json:"pubsubTopic"`
MessageHash MessageHash `json:"messageHash"`
}
type StoreQueryResponse struct {
RequestId string `json:"requestId,omitempty"`
StatusCode *uint32 `json:"statusCode,omitempty"`
StatusDesc string `json:"statusDesc,omitempty"`
Messages *[]StoreMessageResponse `json:"messages,omitempty"`
PaginationCursor MessageHash `json:"paginationCursor,omitempty"`
}

View File

@ -323,7 +323,6 @@ import (
"time"
"unsafe"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
@ -331,7 +330,6 @@ import (
libp2pproto "github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/waku-go-bindings/waku/common"
"go.uber.org/zap"
@ -347,6 +345,7 @@ type WakuConfig struct {
Nodekey string `json:"nodekey,omitempty"`
Relay bool `json:"relay,omitempty"`
Store bool `json:"store,omitempty"`
LegacyStore bool `json:"legacyStore"`
Storenode string `json:"storenode,omitempty"`
StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"`
StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"`
@ -827,7 +826,7 @@ func (n *WakuNode) Version() (string, error) {
return "", errors.New(errMsg)
}
func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) {
func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQueryRequest, peerInfo peer.AddrInfo) (*common.StoreQueryResponse, error) {
timeoutMs := getContextTimeoutMilliseconds(ctx)
b, err := json.Marshal(storeRequest)
@ -856,24 +855,24 @@ func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQu
if C.getRet(resp) == C.RET_OK {
jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
storeQueryResponse := &storepb.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponseStr), storeQueryResponse)
storeQueryResponse := common.StoreQueryResponse{}
err = json.Unmarshal([]byte(jsonResponseStr), &storeQueryResponse)
if err != nil {
return nil, err
}
return storeQueryResponse, nil
return &storeQueryResponse, nil
}
errMsg := "error WakuStoreQuery: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}
func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (common.MessageHash, error) {
timeoutMs := getContextTimeoutMilliseconds(ctx)
jsonMsg, err := json.Marshal(message)
if err != nil {
return pb.MessageHash{}, err
return common.MessageHash(""), err
}
wg := sync.WaitGroup{}
@ -890,14 +889,14 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu
wg.Wait()
if C.getRet(resp) == C.RET_OK {
msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
msgHashBytes, err := hexutil.Decode(msgHash)
parsedMsgHash, err := common.ToMessageHash(msgHash)
if err != nil {
return pb.MessageHash{}, err
return common.MessageHash(""), err
}
return pb.ToMessageHash(msgHashBytes), nil
return parsedMsgHash, nil
}
errMsg := "WakuRelayPublish: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return pb.MessageHash{}, errors.New(errMsg)
return common.MessageHash(""), errors.New(errMsg)
}
func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) {

View File

@ -3,6 +3,7 @@ package waku
import (
"context"
"errors"
"fmt"
"slices"
"testing"
"time"
@ -16,6 +17,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/waku-go-bindings/waku/common"
)
// In order to run this test, you must run an nwaku node
@ -457,7 +459,7 @@ func TestRelay(t *testing.T) {
Payload: []byte{1, 2, 3, 4, 5, 6},
ContentTopic: "test-content-topic",
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
Timestamp: proto.Int64(time.Now().UnixNano()),
}
// send message
pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0])
@ -633,3 +635,180 @@ func TestConnectionChange(t *testing.T) {
require.NoError(t, node1.Stop())
require.NoError(t, node2.Stop())
}
func TestStore(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
// start node that will send the message
senderNodeWakuConfig := WakuConfig{
Relay: true,
Store: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: 16,
Shards: []uint16{64},
Discv5UdpPort: 9070,
TcpPort: 60070,
LegacyStore: false,
}
senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode"))
require.NoError(t, err)
require.NoError(t, senderNode.Start())
// start node that will receive the message
receiverNodeWakuConfig := WakuConfig{
Relay: true,
Store: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: 16,
Shards: []uint16{64},
Discv5UdpPort: 9071,
TcpPort: 60071,
LegacyStore: false,
}
receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode"))
require.NoError(t, err)
require.NoError(t, receiverNode.Start())
receiverMultiaddr, err := receiverNode.ListenAddresses()
require.NoError(t, err)
require.NotNil(t, receiverMultiaddr)
require.True(t, len(receiverMultiaddr) > 0)
// Dial so they become peers
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
err = senderNode.Connect(ctx, receiverMultiaddr[0])
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Check that both nodes now have one connected peer
senderPeerCount, err := senderNode.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, senderPeerCount == 1, "Dialer node should have 1 peer")
receiverPeerCount, err := receiverNode.GetNumConnectedPeers()
require.NoError(t, err)
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")
// Send 8 messages
numMessages := 8
paginationLimit := 5
timeStart := proto.Int64(time.Now().UnixNano())
hashes := []common.MessageHash{}
pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0])
for i := 0; i < numMessages; i++ {
message := &pb.WakuMessage{
Payload: []byte{byte(i)}, // Include message number in payload
ContentTopic: "test-content-topic",
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().UnixNano()),
}
ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel2()
hash, err := senderNode.RelayPublish(ctx2, message, pubsubTopic)
require.NoError(t, err)
hashes = append(hashes, hash)
}
// Wait to receive all 8 messages
receivedCount := 0
receivedMessages := make(map[byte]bool)
// Use a timeout for the entire receive operation
timeoutChan := time.After(10 * time.Second)
for receivedCount < numMessages {
select {
case envelope := <-receiverNode.MsgChan:
require.NotNil(t, envelope, "Envelope should be received")
payload := envelope.Message().Payload
msgNum := payload[0]
// Check if we've already received this message number
if !receivedMessages[msgNum] {
receivedMessages[msgNum] = true
receivedCount++
}
require.Equal(t, "test-content-topic", envelope.Message().ContentTopic, "Content topic should match")
case <-timeoutChan:
t.Fatalf("Timeout: Only received %d messages out of 8 within 10 seconds", receivedCount)
}
}
// Verify we received all messages
for i := 0; i < numMessages; i++ {
require.True(t, receivedMessages[byte(i)], fmt.Sprintf("Message %d was not received", i))
}
// Now send store query
storeReq1 := common.StoreQueryRequest{
IncludeData: true,
ContentTopics: &[]string{"test-content-topic"},
PaginationLimit: proto.Uint64(uint64(paginationLimit)),
PaginationForward: true,
TimeStart: timeStart,
}
storeNodeAddrInfo, err := peer.AddrInfoFromString(receiverMultiaddr[0].String())
require.NoError(t, err)
ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel3()
res1, err := senderNode.StoreQuery(ctx3, &storeReq1, *storeNodeAddrInfo)
require.NoError(t, err)
storedMessages1 := *res1.Messages
for i := 0; i < paginationLimit; i++ {
require.True(t, storedMessages1[i].MessageHash == hashes[i], fmt.Sprintf("Stored message does not match received message for index %d", i))
}
// Now let's query the second page
storeReq2 := common.StoreQueryRequest{
IncludeData: true,
ContentTopics: &[]string{"test-content-topic"},
PaginationLimit: proto.Uint64(uint64(paginationLimit)),
PaginationForward: true,
TimeStart: timeStart,
PaginationCursor: &res1.PaginationCursor,
}
ctx4, cancel4 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel4()
res2, err := senderNode.StoreQuery(ctx4, &storeReq2, *storeNodeAddrInfo)
require.NoError(t, err)
storedMessages2 := *res2.Messages
for i := 0; i < len(storedMessages2); i++ {
require.True(t, storedMessages2[i].MessageHash == hashes[i+paginationLimit], fmt.Sprintf("Stored message does not match received message for index %d", i))
}
// Now let's query for two specific message hashes
storeReq3 := common.StoreQueryRequest{
IncludeData: true,
ContentTopics: &[]string{"test-content-topic"},
MessageHashes: &[]common.MessageHash{hashes[0], hashes[2]},
}
ctx5, cancel5 := context.WithTimeout(context.Background(), requestTimeout)
defer cancel5()
res3, err := senderNode.StoreQuery(ctx5, &storeReq3, *storeNodeAddrInfo)
require.NoError(t, err)
storedMessages3 := *res3.Messages
require.True(t, storedMessages3[0].MessageHash == hashes[0], "Stored message does not match queried message")
require.True(t, storedMessages3[1].MessageHash == hashes[2], "Stored message does not match queried message")
// Stop nodes
require.NoError(t, senderNode.Stop())
require.NoError(t, receiverNode.Stop())
}