fix: convert pb.WakuMessage to RPCWakuMessage so version is included in replies

This commit is contained in:
Richard Ramos 2023-02-16 23:35:22 -04:00 committed by RichΛrd
parent 9c75a3325d
commit 053f3f2540
8 changed files with 121 additions and 31 deletions

View File

@ -124,7 +124,9 @@ func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs,
return fmt.Errorf("topic %s not subscribed", args.ContentTopic) return fmt.Errorf("topic %s not subscribed", args.ContentTopic)
} }
*reply = f.messages[args.ContentTopic] for i := range f.messages[args.ContentTopic] {
*reply = append(*reply, ProtoToRPC(f.messages[args.ContentTopic][i]))
}
f.messages[args.ContentTopic] = make([]*wpb.WakuMessage, 0) f.messages[args.ContentTopic] = make([]*wpb.WakuMessage, 0)
return nil return nil

View File

@ -39,13 +39,13 @@ type KeyPairReply struct {
type SymmetricMessageArgs struct { type SymmetricMessageArgs struct {
Topic string `json:"topic"` Topic string `json:"topic"`
Message *pb.WakuMessage `json:"message"` Message *RPCWakuMessage `json:"message"`
SymKey string `json:"symkey"` SymKey string `json:"symkey"`
} }
type AsymmetricMessageArgs struct { type AsymmetricMessageArgs struct {
Topic string `json:"topic"` Topic string `json:"topic"`
Message *pb.WakuMessage `json:"message"` Message *RPCWakuMessage `json:"message"`
PublicKey string `json:"publicKey"` PublicKey string `json:"publicKey"`
} }
@ -128,7 +128,9 @@ func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *Symmetr
msg := args.Message msg := args.Message
msg.Version = 1 msg.Version = 1
err = payload.EncodeWakuMessage(msg, keyInfo) protoMsg := msg.toProto()
err = payload.EncodeWakuMessage(protoMsg, keyInfo)
if err != nil { if err != nil {
return err return err
} }
@ -138,7 +140,7 @@ func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *Symmetr
topic = relay.DefaultWakuTopic topic = relay.DefaultWakuTopic
} }
_, err = p.node.Relay().PublishToTopic(req.Context(), msg, topic) _, err = p.node.Relay().PublishToTopic(req.Context(), protoMsg, topic)
if err != nil { if err != nil {
return err return err
} }
@ -166,7 +168,9 @@ func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *Asymme
msg := args.Message msg := args.Message
msg.Version = 1 msg.Version = 1
err = payload.EncodeWakuMessage(msg, keyInfo) protoMsg := msg.toProto()
err = payload.EncodeWakuMessage(protoMsg, keyInfo)
if err != nil { if err != nil {
return err return err
} }
@ -176,7 +180,7 @@ func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *Asymme
topic = relay.DefaultWakuTopic topic = relay.DefaultWakuTopic
} }
_, err = p.node.Relay().PublishToTopic(req.Context(), msg, topic) _, err = p.node.Relay().PublishToTopic(req.Context(), protoMsg, topic)
if err != nil { if err != nil {
return err return err
} }
@ -214,7 +218,9 @@ func (p *PrivateService) GetV1SymmetricMessages(req *http.Request, args *Symmetr
decodedMessages = append(decodedMessages, msg) decodedMessages = append(decodedMessages, msg)
} }
*reply = decodedMessages for i := range decodedMessages {
*reply = append(*reply, ProtoToRPC(decodedMessages[i]))
}
return nil return nil
} }
@ -248,7 +254,9 @@ func (p *PrivateService) GetV1AsymmetricMessages(req *http.Request, args *Asymme
decodedMessages = append(decodedMessages, msg) decodedMessages = append(decodedMessages, msg)
} }
*reply = decodedMessages for i := range decodedMessages {
*reply = append(*reply, ProtoToRPC(decodedMessages[i]))
}
return nil return nil
} }

View File

@ -60,7 +60,7 @@ func TestPostV1SymmetricMessage(t *testing.T) {
makeRequest(t), makeRequest(t),
&SymmetricMessageArgs{ &SymmetricMessageArgs{
Topic: "test", Topic: "test",
Message: &pb.WakuMessage{Payload: []byte("test")}, Message: ProtoToRPC(&pb.WakuMessage{Payload: []byte("test")}),
SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122", SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122",
}, },
&reply, &reply,
@ -78,7 +78,7 @@ func TestPostV1AsymmetricMessage(t *testing.T) {
makeRequest(t), makeRequest(t),
&AsymmetricMessageArgs{ &AsymmetricMessageArgs{
Topic: "test", Topic: "test",
Message: &pb.WakuMessage{Payload: []byte("test")}, Message: ProtoToRPC(&pb.WakuMessage{Payload: []byte("test")}),
PublicKey: "0x045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5", PublicKey: "0x045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5",
}, },
&reply, &reply,
@ -101,7 +101,7 @@ func TestGetV1SymmetricMessages(t *testing.T) {
makeRequest(t), makeRequest(t),
&SymmetricMessageArgs{ &SymmetricMessageArgs{
Topic: "test", Topic: "test",
Message: &pb.WakuMessage{Payload: []byte("test")}, Message: ProtoToRPC(&pb.WakuMessage{Payload: []byte("test")}),
SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122", SymKey: "0x1122334455667788991011223344556677889910112233445566778899101122",
}, },
&reply, &reply,
@ -141,7 +141,7 @@ func TestGetV1AsymmetricMessages(t *testing.T) {
makeRequest(t), makeRequest(t),
&AsymmetricMessageArgs{ &AsymmetricMessageArgs{
Topic: "test", Topic: "test",
Message: &pb.WakuMessage{Payload: []byte("test")}, Message: ProtoToRPC(&pb.WakuMessage{Payload: []byte("test")}),
PublicKey: hexutil.Encode(crypto.FromECDSAPub(&prvKey.PublicKey)), PublicKey: hexutil.Encode(crypto.FromECDSAPub(&prvKey.PublicKey)),
}, },
&reply, &reply,

View File

@ -26,7 +26,7 @@ type RelayService struct {
type RelayMessageArgs struct { type RelayMessageArgs struct {
Topic string `json:"topic,omitempty"` Topic string `json:"topic,omitempty"`
Message *pb.WakuMessage `json:"message,omitempty"` Message *RPCWakuMessage `json:"message,omitempty"`
} }
type TopicsArgs struct { type TopicsArgs struct {
@ -86,9 +86,9 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
var err error var err error
if args.Topic == "" { if args.Topic == "" {
_, err = r.node.Relay().Publish(req.Context(), args.Message) _, err = r.node.Relay().Publish(req.Context(), args.Message.toProto())
} else { } else {
_, err = r.node.Relay().PublishToTopic(req.Context(), args.Message, args.Topic) _, err = r.node.Relay().PublishToTopic(req.Context(), args.Message.toProto(), args.Topic)
} }
if err != nil { if err != nil {
r.log.Error("publishing message", zap.Error(err)) r.log.Error("publishing message", zap.Error(err))
@ -141,7 +141,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs,
return nil return nil
} }
func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *RelayMessagesReply) error { func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
r.messagesMutex.Lock() r.messagesMutex.Lock()
defer r.messagesMutex.Unlock() defer r.messagesMutex.Unlock()
@ -149,7 +149,9 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *
return fmt.Errorf("topic %s not subscribed", args.Topic) return fmt.Errorf("topic %s not subscribed", args.Topic)
} }
*reply = r.messages[args.Topic] for i := range r.messages[args.Topic] {
*reply = append(*reply, ProtoToRPC(r.messages[args.Topic][i]))
}
r.messages[args.Topic] = make([]*pb.WakuMessage, 0) r.messages[args.Topic] = make([]*pb.WakuMessage, 0)

View File

@ -38,7 +38,7 @@ func TestPostV1Message(t *testing.T) {
err := d.PostV1Message( err := d.PostV1Message(
makeRequest(t), makeRequest(t),
&RelayMessageArgs{ &RelayMessageArgs{
Message: msg, Message: ProtoToRPC(msg),
}, },
&reply, &reply,
) )
@ -110,9 +110,9 @@ func TestRelayGetV1Messages(t *testing.T) {
makeRequest(t), makeRequest(t),
&RelayMessageArgs{ &RelayMessageArgs{
Topic: "test", Topic: "test",
Message: &pb.WakuMessage{ Message: ProtoToRPC(&pb.WakuMessage{
Payload: []byte("test"), Payload: []byte("test"),
}, }),
}, },
&reply, &reply,
) )
@ -122,7 +122,7 @@ func TestRelayGetV1Messages(t *testing.T) {
// Wait for the message to be received // Wait for the message to be received
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
var messagesReply1 RelayMessagesReply var messagesReply1 MessagesReply
err = serviceB.GetV1Messages( err = serviceB.GetV1Messages(
makeRequest(t), makeRequest(t),
&TopicArgs{"test"}, &TopicArgs{"test"},
@ -131,7 +131,7 @@ func TestRelayGetV1Messages(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Len(t, messagesReply1, 1) require.Len(t, messagesReply1, 1)
var messagesReply2 RelayMessagesReply var messagesReply2 MessagesReply
err = serviceB.GetV1Messages( err = serviceB.GetV1Messages(
makeRequest(t), makeRequest(t),
&TopicArgs{"test"}, &TopicArgs{"test"},

View File

@ -1,12 +1,8 @@
package rpc package rpc
import "github.com/waku-org/go-waku/waku/v2/protocol/pb"
type SuccessReply = bool type SuccessReply = bool
type Empty struct { type Empty struct {
} }
type MessagesReply = []*pb.WakuMessage type MessagesReply = []*RPCWakuMessage
type RelayMessagesReply = []*pb.WakuMessage

View File

@ -4,7 +4,6 @@ import (
"net/http" "net/http"
"github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/node"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"go.uber.org/zap" "go.uber.org/zap"
@ -34,7 +33,7 @@ type StoreMessagesArgs struct {
} }
type StoreMessagesReply struct { type StoreMessagesReply struct {
Messages []*wpb.WakuMessage `json:"messages,omitempty"` Messages []*RPCWakuMessage `json:"messages,omitempty"`
PagingInfo StorePagingOptions `json:"pagingInfo,omitempty"` PagingInfo StorePagingOptions `json:"pagingInfo,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
@ -62,7 +61,10 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs,
return nil return nil
} }
reply.Messages = res.Messages reply.Messages = make([]*RPCWakuMessage, len(res.Messages))
for i := range res.Messages {
reply.Messages[i] = ProtoToRPC(res.Messages[i])
}
reply.PagingInfo = StorePagingOptions{ reply.PagingInfo = StorePagingOptions{
PageSize: args.PagingOptions.PageSize, PageSize: args.PagingOptions.PageSize,

80
waku/v2/rpc/utils.go Normal file
View File

@ -0,0 +1,80 @@
package rpc
import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
type RateLimitProof struct {
Proof []byte `json:"proof,omitempty"`
MerkleRoot []byte `json:"merkle_root,omitempty"`
Epoch []byte `json:"epoch,omitempty"`
ShareX []byte `json:"share_x,omitempty"`
ShareY []byte `json:"share_y,omitempty"`
Nullifier []byte `json:"nullifier,omitempty"`
RlnIdentifier []byte `json:"rln_identifier,omitempty"`
}
type RPCWakuMessage struct {
Payload []byte `json:"payload,omitempty"`
ContentTopic string `json:"contentTopic,omitempty"`
Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp,omitempty"`
RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"`
Ephemeral bool `json:"ephemeral,omitempty"`
}
func ProtoToRPC(input *pb.WakuMessage) *RPCWakuMessage {
if input == nil {
return nil
}
rpcWakuMsg := &RPCWakuMessage{
Payload: input.Payload,
ContentTopic: input.ContentTopic,
Version: input.Version,
Timestamp: input.Timestamp,
Ephemeral: input.Ephemeral,
}
if input.RateLimitProof != nil {
rpcWakuMsg.RateLimitProof = &RateLimitProof{
Proof: input.RateLimitProof.Proof,
MerkleRoot: input.RateLimitProof.MerkleRoot,
Epoch: input.RateLimitProof.Epoch,
ShareX: input.RateLimitProof.ShareX,
ShareY: input.RateLimitProof.ShareY,
Nullifier: input.RateLimitProof.Nullifier,
RlnIdentifier: input.RateLimitProof.RlnIdentifier,
}
}
return rpcWakuMsg
}
func (r *RPCWakuMessage) toProto() *pb.WakuMessage {
if r == nil {
return nil
}
msg := &pb.WakuMessage{
Payload: r.Payload,
ContentTopic: r.ContentTopic,
Version: r.Version,
Timestamp: r.Timestamp,
Ephemeral: r.Ephemeral,
}
if r.RateLimitProof != nil {
msg.RateLimitProof = &pb.RateLimitProof{
Proof: r.RateLimitProof.Proof,
MerkleRoot: r.RateLimitProof.MerkleRoot,
Epoch: r.RateLimitProof.Epoch,
ShareX: r.RateLimitProof.ShareX,
ShareY: r.RateLimitProof.ShareY,
Nullifier: r.RateLimitProof.Nullifier,
RlnIdentifier: r.RateLimitProof.RlnIdentifier,
}
}
return msg
}