diff --git a/waku/common/envelope.go b/waku/common/envelope.go index dbf80cb..f21931a 100644 --- a/waku/common/envelope.go +++ b/waku/common/envelope.go @@ -3,7 +3,6 @@ package common import ( "encoding/json" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) @@ -13,13 +12,13 @@ import ( type Envelope interface { Message() *pb.WakuMessage PubsubTopic() string - Hash() pb.MessageHash + Hash() MessageHash } type envelopeImpl struct { msg *pb.WakuMessage topic string - hash pb.MessageHash + hash MessageHash } type tmpWakuMessageJson struct { @@ -35,7 +34,7 @@ type tmpWakuMessageJson struct { type tmpEnvelopeStruct struct { WakuMessage tmpWakuMessageJson `json:"wakuMessage"` PubsubTopic string `json:"pubsubTopic"` - MessageHash string `json:"messageHash"` + MessageHash MessageHash `json:"messageHash"` } // NewEnvelope creates a new Envelope from a json string generated in nwaku @@ -46,7 +45,6 @@ func NewEnvelope(jsonEventStr string) (Envelope, error) { return nil, err } - hash, err := hexutil.Decode(tmpEnvelopeStruct.MessageHash) if err != nil { return nil, err } @@ -62,7 +60,7 @@ func NewEnvelope(jsonEventStr string) (Envelope, error) { RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof, }, topic: tmpEnvelopeStruct.PubsubTopic, - hash: pb.ToMessageHash(hash), + hash: tmpEnvelopeStruct.MessageHash, }, nil } @@ -74,6 +72,6 @@ func (e *envelopeImpl) PubsubTopic() string { return e.topic } -func (e *envelopeImpl) Hash() pb.MessageHash { +func (e *envelopeImpl) Hash() MessageHash { return e.hash } diff --git a/waku/common/message_hash.go b/waku/common/message_hash.go new file mode 100644 index 0000000..c51e480 --- /dev/null +++ b/waku/common/message_hash.go @@ -0,0 +1,39 @@ +package common + +import ( + "encoding/hex" + "errors" + "fmt" +) + +// MessageHash represents an unique identifier for a message within a pubsub topic +type MessageHash string + +func ToMessageHash(val string) (MessageHash, error) { + if len(val) == 0 { + return "", errors.New("empty string not allowed") + } + + if len(val) < 2 || val[:2] != "0x" { + return "", errors.New("string must start with 0x") + } + + // Remove "0x" prefix for hex decoding + hexStr := val[2:] + + // Verify the remaining string is valid hex + _, err := hex.DecodeString(hexStr) + if err != nil { + return "", fmt.Errorf("invalid hex string: %v", err) + } + + return MessageHash(val), nil +} + +func (h MessageHash) String() string { + return string(h) +} + +func (h MessageHash) Bytes() ([]byte, error) { + return hex.DecodeString(string(h)) +} diff --git a/waku/common/store.go b/waku/common/store.go new file mode 100644 index 0000000..32e76ad --- /dev/null +++ b/waku/common/store.go @@ -0,0 +1,28 @@ +package common + +type StoreQueryRequest struct { + RequestId string `json:"request_id"` + IncludeData bool `json:"include_data"` + PubsubTopic string `json:"pubsub_topic,omitempty"` + ContentTopics *[]string `json:"content_topics,omitempty"` + TimeStart *int64 `json:"time_start,omitempty"` + TimeEnd *int64 `json:"time_end,omitempty"` + MessageHashes *[]MessageHash `json:"message_hashes,omitempty"` + PaginationCursor *MessageHash `json:"pagination_cursor,omitempty"` + PaginationForward bool `json:"pagination_forward"` + PaginationLimit *uint64 `json:"pagination_limit,omitempty"` +} + +type StoreMessageResponse struct { + WakuMessage *tmpWakuMessageJson `json:"message"` + PubsubTopic string `json:"pubsubTopic"` + MessageHash MessageHash `json:"messageHash"` +} + +type StoreQueryResponse struct { + RequestId string `json:"requestId,omitempty"` + StatusCode *uint32 `json:"statusCode,omitempty"` + StatusDesc string `json:"statusDesc,omitempty"` + Messages *[]StoreMessageResponse `json:"messages,omitempty"` + PaginationCursor MessageHash `json:"paginationCursor,omitempty"` +} diff --git a/waku/nwaku.go b/waku/nwaku.go index 9f29c27..3c46fb3 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -323,7 +323,6 @@ import ( "time" "unsafe" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" @@ -331,7 +330,6 @@ import ( libp2pproto "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/waku-go-bindings/waku/common" "go.uber.org/zap" @@ -347,6 +345,7 @@ type WakuConfig struct { Nodekey string `json:"nodekey,omitempty"` Relay bool `json:"relay,omitempty"` Store bool `json:"store,omitempty"` + LegacyStore bool `json:"legacyStore"` Storenode string `json:"storenode,omitempty"` StoreMessageRetentionPolicy string `json:"storeMessageRetentionPolicy,omitempty"` StoreMessageDbUrl string `json:"storeMessageDbUrl,omitempty"` @@ -827,7 +826,7 @@ func (n *WakuNode) Version() (string, error) { return "", errors.New(errMsg) } -func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQueryRequest, peerInfo peer.AddrInfo) (*storepb.StoreQueryResponse, error) { +func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *common.StoreQueryRequest, peerInfo peer.AddrInfo) (*common.StoreQueryResponse, error) { timeoutMs := getContextTimeoutMilliseconds(ctx) b, err := json.Marshal(storeRequest) @@ -856,24 +855,24 @@ func (n *WakuNode) StoreQuery(ctx context.Context, storeRequest *storepb.StoreQu if C.getRet(resp) == C.RET_OK { jsonResponseStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - storeQueryResponse := &storepb.StoreQueryResponse{} - err = json.Unmarshal([]byte(jsonResponseStr), storeQueryResponse) + storeQueryResponse := common.StoreQueryResponse{} + err = json.Unmarshal([]byte(jsonResponseStr), &storeQueryResponse) if err != nil { return nil, err } - return storeQueryResponse, nil + return &storeQueryResponse, nil } errMsg := "error WakuStoreQuery: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) return nil, errors.New(errMsg) } -func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) { +func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (common.MessageHash, error) { timeoutMs := getContextTimeoutMilliseconds(ctx) jsonMsg, err := json.Marshal(message) if err != nil { - return pb.MessageHash{}, err + return common.MessageHash(""), err } wg := sync.WaitGroup{} @@ -890,14 +889,14 @@ func (n *WakuNode) RelayPublish(ctx context.Context, message *pb.WakuMessage, pu wg.Wait() if C.getRet(resp) == C.RET_OK { msgHash := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - msgHashBytes, err := hexutil.Decode(msgHash) + parsedMsgHash, err := common.ToMessageHash(msgHash) if err != nil { - return pb.MessageHash{}, err + return common.MessageHash(""), err } - return pb.ToMessageHash(msgHashBytes), nil + return parsedMsgHash, nil } errMsg := "WakuRelayPublish: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return pb.MessageHash{}, errors.New(errMsg) + return common.MessageHash(""), errors.New(errMsg) } func (n *WakuNode) DnsDiscovery(ctx context.Context, enrTreeUrl string, nameDnsServer string) ([]multiaddr.Multiaddr, error) { diff --git a/waku/nwaku_test.go b/waku/nwaku_test.go index 09db846..ba7ed71 100644 --- a/waku/nwaku_test.go +++ b/waku/nwaku_test.go @@ -3,6 +3,7 @@ package waku import ( "context" "errors" + "fmt" "slices" "testing" "time" @@ -16,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/waku-go-bindings/waku/common" ) // In order to run this test, you must run an nwaku node @@ -457,7 +459,7 @@ func TestRelay(t *testing.T) { Payload: []byte{1, 2, 3, 4, 5, 6}, ContentTopic: "test-content-topic", Version: proto.Uint32(0), - Timestamp: proto.Int64(time.Now().Unix()), + Timestamp: proto.Int64(time.Now().UnixNano()), } // send message pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0]) @@ -633,3 +635,180 @@ func TestConnectionChange(t *testing.T) { require.NoError(t, node1.Stop()) require.NoError(t, node2.Stop()) } + +func TestStore(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + // start node that will send the message + senderNodeWakuConfig := WakuConfig{ + Relay: true, + Store: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9070, + TcpPort: 60070, + LegacyStore: false, + } + + senderNode, err := NewWakuNode(&senderNodeWakuConfig, logger.Named("senderNode")) + require.NoError(t, err) + require.NoError(t, senderNode.Start()) + + // start node that will receive the message + receiverNodeWakuConfig := WakuConfig{ + Relay: true, + Store: true, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + Discv5UdpPort: 9071, + TcpPort: 60071, + LegacyStore: false, + } + receiverNode, err := NewWakuNode(&receiverNodeWakuConfig, logger.Named("receiverNode")) + require.NoError(t, err) + require.NoError(t, receiverNode.Start()) + receiverMultiaddr, err := receiverNode.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, receiverMultiaddr) + require.True(t, len(receiverMultiaddr) > 0) + + // Dial so they become peers + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + err = senderNode.Connect(ctx, receiverMultiaddr[0]) + require.NoError(t, err) + time.Sleep(1 * time.Second) + // Check that both nodes now have one connected peer + senderPeerCount, err := senderNode.GetNumConnectedPeers() + require.NoError(t, err) + require.True(t, senderPeerCount == 1, "Dialer node should have 1 peer") + receiverPeerCount, err := receiverNode.GetNumConnectedPeers() + require.NoError(t, err) + require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer") + + // Send 8 messages + numMessages := 8 + paginationLimit := 5 + timeStart := proto.Int64(time.Now().UnixNano()) + hashes := []common.MessageHash{} + pubsubTopic := FormatWakuRelayTopic(senderNodeWakuConfig.ClusterID, senderNodeWakuConfig.Shards[0]) + + for i := 0; i < numMessages; i++ { + message := &pb.WakuMessage{ + Payload: []byte{byte(i)}, // Include message number in payload + ContentTopic: "test-content-topic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().UnixNano()), + } + + ctx2, cancel2 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel2() + + hash, err := senderNode.RelayPublish(ctx2, message, pubsubTopic) + require.NoError(t, err) + hashes = append(hashes, hash) + } + + // Wait to receive all 8 messages + receivedCount := 0 + receivedMessages := make(map[byte]bool) + + // Use a timeout for the entire receive operation + timeoutChan := time.After(10 * time.Second) + + for receivedCount < numMessages { + select { + case envelope := <-receiverNode.MsgChan: + require.NotNil(t, envelope, "Envelope should be received") + + payload := envelope.Message().Payload + msgNum := payload[0] + + // Check if we've already received this message number + if !receivedMessages[msgNum] { + receivedMessages[msgNum] = true + receivedCount++ + } + + require.Equal(t, "test-content-topic", envelope.Message().ContentTopic, "Content topic should match") + + case <-timeoutChan: + t.Fatalf("Timeout: Only received %d messages out of 8 within 10 seconds", receivedCount) + } + } + + // Verify we received all messages + for i := 0; i < numMessages; i++ { + require.True(t, receivedMessages[byte(i)], fmt.Sprintf("Message %d was not received", i)) + } + + // Now send store query + storeReq1 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(paginationLimit)), + PaginationForward: true, + TimeStart: timeStart, + } + + storeNodeAddrInfo, err := peer.AddrInfoFromString(receiverMultiaddr[0].String()) + require.NoError(t, err) + + ctx3, cancel3 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel3() + + res1, err := senderNode.StoreQuery(ctx3, &storeReq1, *storeNodeAddrInfo) + require.NoError(t, err) + + storedMessages1 := *res1.Messages + for i := 0; i < paginationLimit; i++ { + require.True(t, storedMessages1[i].MessageHash == hashes[i], fmt.Sprintf("Stored message does not match received message for index %d", i)) + } + + // Now let's query the second page + storeReq2 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(paginationLimit)), + PaginationForward: true, + TimeStart: timeStart, + PaginationCursor: &res1.PaginationCursor, + } + + ctx4, cancel4 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel4() + + res2, err := senderNode.StoreQuery(ctx4, &storeReq2, *storeNodeAddrInfo) + require.NoError(t, err) + + storedMessages2 := *res2.Messages + for i := 0; i < len(storedMessages2); i++ { + require.True(t, storedMessages2[i].MessageHash == hashes[i+paginationLimit], fmt.Sprintf("Stored message does not match received message for index %d", i)) + } + + // Now let's query for two specific message hashes + storeReq3 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + MessageHashes: &[]common.MessageHash{hashes[0], hashes[2]}, + } + + ctx5, cancel5 := context.WithTimeout(context.Background(), requestTimeout) + defer cancel5() + + res3, err := senderNode.StoreQuery(ctx5, &storeReq3, *storeNodeAddrInfo) + require.NoError(t, err) + + storedMessages3 := *res3.Messages + require.True(t, storedMessages3[0].MessageHash == hashes[0], "Stored message does not match queried message") + require.True(t, storedMessages3[1].MessageHash == hashes[2], "Stored message does not match queried message") + + // Stop nodes + require.NoError(t, senderNode.Stop()) + require.NoError(t, receiverNode.Stop()) +}