From 053f3f254045ecf1b4aa9936214532816b1a899f Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 16 Feb 2023 23:35:22 -0400 Subject: [PATCH] fix: convert pb.WakuMessage to RPCWakuMessage so version is included in replies --- waku/v2/rpc/filter.go | 4 +- waku/v2/rpc/private.go | 24 +++++++---- waku/v2/rpc/private_test.go | 8 ++-- waku/v2/rpc/relay.go | 12 +++--- waku/v2/rpc/relay_test.go | 10 ++--- waku/v2/rpc/rpc_type.go | 6 +-- waku/v2/rpc/store.go | 8 ++-- waku/v2/rpc/utils.go | 80 +++++++++++++++++++++++++++++++++++++ 8 files changed, 121 insertions(+), 31 deletions(-) create mode 100644 waku/v2/rpc/utils.go diff --git a/waku/v2/rpc/filter.go b/waku/v2/rpc/filter.go index 7e573e28..53d98be9 100644 --- a/waku/v2/rpc/filter.go +++ b/waku/v2/rpc/filter.go @@ -124,7 +124,9 @@ func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs, return fmt.Errorf("topic %s not subscribed", args.ContentTopic) } - *reply = f.messages[args.ContentTopic] + for i := range f.messages[args.ContentTopic] { + *reply = append(*reply, ProtoToRPC(f.messages[args.ContentTopic][i])) + } f.messages[args.ContentTopic] = make([]*wpb.WakuMessage, 0) return nil diff --git a/waku/v2/rpc/private.go b/waku/v2/rpc/private.go index 593598f0..0075f20e 100644 --- a/waku/v2/rpc/private.go +++ b/waku/v2/rpc/private.go @@ -39,13 +39,13 @@ type KeyPairReply struct { type SymmetricMessageArgs struct { Topic string `json:"topic"` - Message *pb.WakuMessage `json:"message"` + Message *RPCWakuMessage `json:"message"` SymKey string `json:"symkey"` } type AsymmetricMessageArgs struct { Topic string `json:"topic"` - Message *pb.WakuMessage `json:"message"` + Message *RPCWakuMessage `json:"message"` PublicKey string `json:"publicKey"` } @@ -128,7 +128,9 @@ func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *Symmetr msg := args.Message msg.Version = 1 - err = payload.EncodeWakuMessage(msg, keyInfo) + protoMsg := msg.toProto() + + err = payload.EncodeWakuMessage(protoMsg, keyInfo) if err != nil { return err } @@ -138,7 +140,7 @@ func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *Symmetr topic = relay.DefaultWakuTopic } - _, err = p.node.Relay().PublishToTopic(req.Context(), msg, topic) + _, err = p.node.Relay().PublishToTopic(req.Context(), protoMsg, topic) if err != nil { return err } @@ -166,7 +168,9 @@ func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *Asymme msg := args.Message msg.Version = 1 - err = payload.EncodeWakuMessage(msg, keyInfo) + protoMsg := msg.toProto() + + err = payload.EncodeWakuMessage(protoMsg, keyInfo) if err != nil { return err } @@ -176,7 +180,7 @@ func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *Asymme topic = relay.DefaultWakuTopic } - _, err = p.node.Relay().PublishToTopic(req.Context(), msg, topic) + _, err = p.node.Relay().PublishToTopic(req.Context(), protoMsg, topic) if err != nil { return err } @@ -214,7 +218,9 @@ func (p *PrivateService) GetV1SymmetricMessages(req *http.Request, args *Symmetr decodedMessages = append(decodedMessages, msg) } - *reply = decodedMessages + for i := range decodedMessages { + *reply = append(*reply, ProtoToRPC(decodedMessages[i])) + } return nil } @@ -248,7 +254,9 @@ func (p *PrivateService) GetV1AsymmetricMessages(req *http.Request, args *Asymme decodedMessages = append(decodedMessages, msg) } - *reply = decodedMessages + for i := range decodedMessages { + *reply = append(*reply, ProtoToRPC(decodedMessages[i])) + } return nil } diff --git a/waku/v2/rpc/private_test.go b/waku/v2/rpc/private_test.go index 32da49b6..5410d305 100644 --- a/waku/v2/rpc/private_test.go +++ b/waku/v2/rpc/private_test.go @@ -60,7 +60,7 @@ func TestPostV1SymmetricMessage(t *testing.T) { makeRequest(t), &SymmetricMessageArgs{ Topic: "test", - Message: &pb.WakuMessage{Payload: []byte("test")}, + Message: ProtoToRPC(&pb.WakuMessage{Payload: []byte("test")}), SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122", }, &reply, @@ -78,7 +78,7 @@ func TestPostV1AsymmetricMessage(t *testing.T) { makeRequest(t), &AsymmetricMessageArgs{ Topic: "test", - Message: &pb.WakuMessage{Payload: []byte("test")}, + Message: ProtoToRPC(&pb.WakuMessage{Payload: []byte("test")}), PublicKey: "0x045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5", }, &reply, @@ -101,7 +101,7 @@ func TestGetV1SymmetricMessages(t *testing.T) { makeRequest(t), &SymmetricMessageArgs{ Topic: "test", - Message: &pb.WakuMessage{Payload: []byte("test")}, + Message: ProtoToRPC(&pb.WakuMessage{Payload: []byte("test")}), SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122", }, &reply, @@ -141,7 +141,7 @@ func TestGetV1AsymmetricMessages(t *testing.T) { makeRequest(t), &AsymmetricMessageArgs{ Topic: "test", - Message: &pb.WakuMessage{Payload: []byte("test")}, + Message: ProtoToRPC(&pb.WakuMessage{Payload: []byte("test")}), PublicKey: hexutil.Encode(crypto.FromECDSAPub(&prvKey.PublicKey)), }, &reply, diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index ab70c1a4..180b59d4 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -26,7 +26,7 @@ type RelayService struct { type RelayMessageArgs struct { Topic string `json:"topic,omitempty"` - Message *pb.WakuMessage `json:"message,omitempty"` + Message *RPCWakuMessage `json:"message,omitempty"` } type TopicsArgs struct { @@ -86,9 +86,9 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, var err error if args.Topic == "" { - _, err = r.node.Relay().Publish(req.Context(), args.Message) + _, err = r.node.Relay().Publish(req.Context(), args.Message.toProto()) } else { - _, err = r.node.Relay().PublishToTopic(req.Context(), args.Message, args.Topic) + _, err = r.node.Relay().PublishToTopic(req.Context(), args.Message.toProto(), args.Topic) } if err != nil { r.log.Error("publishing message", zap.Error(err)) @@ -141,7 +141,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, return nil } -func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *RelayMessagesReply) error { +func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { r.messagesMutex.Lock() defer r.messagesMutex.Unlock() @@ -149,7 +149,9 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply * return fmt.Errorf("topic %s not subscribed", args.Topic) } - *reply = r.messages[args.Topic] + for i := range r.messages[args.Topic] { + *reply = append(*reply, ProtoToRPC(r.messages[args.Topic][i])) + } r.messages[args.Topic] = make([]*pb.WakuMessage, 0) diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index 3cd95f3b..61427faf 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -38,7 +38,7 @@ func TestPostV1Message(t *testing.T) { err := d.PostV1Message( makeRequest(t), &RelayMessageArgs{ - Message: msg, + Message: ProtoToRPC(msg), }, &reply, ) @@ -110,9 +110,9 @@ func TestRelayGetV1Messages(t *testing.T) { makeRequest(t), &RelayMessageArgs{ Topic: "test", - Message: &pb.WakuMessage{ + Message: ProtoToRPC(&pb.WakuMessage{ Payload: []byte("test"), - }, + }), }, &reply, ) @@ -122,7 +122,7 @@ func TestRelayGetV1Messages(t *testing.T) { // Wait for the message to be received time.Sleep(1 * time.Second) - var messagesReply1 RelayMessagesReply + var messagesReply1 MessagesReply err = serviceB.GetV1Messages( makeRequest(t), &TopicArgs{"test"}, @@ -131,7 +131,7 @@ func TestRelayGetV1Messages(t *testing.T) { require.NoError(t, err) require.Len(t, messagesReply1, 1) - var messagesReply2 RelayMessagesReply + var messagesReply2 MessagesReply err = serviceB.GetV1Messages( makeRequest(t), &TopicArgs{"test"}, diff --git a/waku/v2/rpc/rpc_type.go b/waku/v2/rpc/rpc_type.go index f33e97e3..720d08ec 100644 --- a/waku/v2/rpc/rpc_type.go +++ b/waku/v2/rpc/rpc_type.go @@ -1,12 +1,8 @@ package rpc -import "github.com/waku-org/go-waku/waku/v2/protocol/pb" - type SuccessReply = bool type Empty struct { } -type MessagesReply = []*pb.WakuMessage - -type RelayMessagesReply = []*pb.WakuMessage +type MessagesReply = []*RPCWakuMessage diff --git a/waku/v2/rpc/store.go b/waku/v2/rpc/store.go index df0bb0ec..f3b08ecc 100644 --- a/waku/v2/rpc/store.go +++ b/waku/v2/rpc/store.go @@ -4,7 +4,6 @@ import ( "net/http" "github.com/waku-org/go-waku/waku/v2/node" - wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "go.uber.org/zap" @@ -34,7 +33,7 @@ type StoreMessagesArgs struct { } type StoreMessagesReply struct { - Messages []*wpb.WakuMessage `json:"messages,omitempty"` + Messages []*RPCWakuMessage `json:"messages,omitempty"` PagingInfo StorePagingOptions `json:"pagingInfo,omitempty"` Error string `json:"error,omitempty"` } @@ -62,7 +61,10 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, return nil } - reply.Messages = res.Messages + reply.Messages = make([]*RPCWakuMessage, len(res.Messages)) + for i := range res.Messages { + reply.Messages[i] = ProtoToRPC(res.Messages[i]) + } reply.PagingInfo = StorePagingOptions{ PageSize: args.PagingOptions.PageSize, diff --git a/waku/v2/rpc/utils.go b/waku/v2/rpc/utils.go new file mode 100644 index 00000000..b836e071 --- /dev/null +++ b/waku/v2/rpc/utils.go @@ -0,0 +1,80 @@ +package rpc + +import ( + "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +type RateLimitProof struct { + Proof []byte `json:"proof,omitempty"` + MerkleRoot []byte `json:"merkle_root,omitempty"` + Epoch []byte `json:"epoch,omitempty"` + ShareX []byte `json:"share_x,omitempty"` + ShareY []byte `json:"share_y,omitempty"` + Nullifier []byte `json:"nullifier,omitempty"` + RlnIdentifier []byte `json:"rln_identifier,omitempty"` +} + +type RPCWakuMessage struct { + Payload []byte `json:"payload,omitempty"` + ContentTopic string `json:"contentTopic,omitempty"` + Version uint32 `json:"version"` + Timestamp int64 `json:"timestamp,omitempty"` + RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"` + Ephemeral bool `json:"ephemeral,omitempty"` +} + +func ProtoToRPC(input *pb.WakuMessage) *RPCWakuMessage { + if input == nil { + return nil + } + + rpcWakuMsg := &RPCWakuMessage{ + Payload: input.Payload, + ContentTopic: input.ContentTopic, + Version: input.Version, + Timestamp: input.Timestamp, + Ephemeral: input.Ephemeral, + } + + if input.RateLimitProof != nil { + rpcWakuMsg.RateLimitProof = &RateLimitProof{ + Proof: input.RateLimitProof.Proof, + MerkleRoot: input.RateLimitProof.MerkleRoot, + Epoch: input.RateLimitProof.Epoch, + ShareX: input.RateLimitProof.ShareX, + ShareY: input.RateLimitProof.ShareY, + Nullifier: input.RateLimitProof.Nullifier, + RlnIdentifier: input.RateLimitProof.RlnIdentifier, + } + } + + return rpcWakuMsg +} + +func (r *RPCWakuMessage) toProto() *pb.WakuMessage { + if r == nil { + return nil + } + + msg := &pb.WakuMessage{ + Payload: r.Payload, + ContentTopic: r.ContentTopic, + Version: r.Version, + Timestamp: r.Timestamp, + Ephemeral: r.Ephemeral, + } + + if r.RateLimitProof != nil { + msg.RateLimitProof = &pb.RateLimitProof{ + Proof: r.RateLimitProof.Proof, + MerkleRoot: r.RateLimitProof.MerkleRoot, + Epoch: r.RateLimitProof.Epoch, + ShareX: r.RateLimitProof.ShareX, + ShareY: r.RateLimitProof.ShareY, + Nullifier: r.RateLimitProof.Nullifier, + RlnIdentifier: r.RateLimitProof.RlnIdentifier, + } + } + + return msg +}