fix(rest): use custom struct for messages instead of protobuffer (#888)

This commit is contained in:
richΛrd 2023-11-10 14:31:36 -04:00 committed by GitHub
parent be9a2cce10
commit b6d9e3d4be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 168 additions and 57 deletions

View File

@ -76,10 +76,12 @@ func (r *FilterService) Stop() {
// NewFilterService returns an instance of FilterService // NewFilterService returns an instance of FilterService
func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *FilterService { func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *FilterService {
logger := log.Named("filter")
s := &FilterService{ s := &FilterService{
node: node, node: node,
log: log.Named("filter"), log: logger,
cache: newFilterCache(cacheCapacity), cache: newFilterCache(cacheCapacity, logger),
} }
m.Get(filterv2Ping, s.ping) m.Get(filterv2Ping, s.ping)
@ -130,9 +132,6 @@ func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) {
}, http.StatusOK) }, http.StatusOK)
} }
///////////////////////
///////////////////////
// same for FilterUnsubscribeRequest // same for FilterUnsubscribeRequest
type filterSubscriptionRequest struct { type filterSubscriptionRequest struct {
RequestId filterRequestId `json:"requestId"` RequestId filterRequestId `json:"requestId"`

View File

@ -5,19 +5,21 @@ import (
"sync" "sync"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "go.uber.org/zap"
) )
type filterCache struct { type filterCache struct {
capacity int capacity int
mu sync.RWMutex mu sync.RWMutex
data map[string]map[string][]*pb.WakuMessage log *zap.Logger
data map[string]map[string][]*RestWakuMessage
} }
func newFilterCache(capacity int) *filterCache { func newFilterCache(capacity int, log *zap.Logger) *filterCache {
return &filterCache{ return &filterCache{
capacity: capacity, capacity: capacity,
data: make(map[string]map[string][]*pb.WakuMessage), data: make(map[string]map[string][]*RestWakuMessage),
log: log.Named("cache"),
} }
} }
@ -28,11 +30,11 @@ func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) {
pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter) pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter)
for pubsubTopic, contentTopics := range pubSubTopicMap { for pubsubTopic, contentTopics := range pubSubTopicMap {
if c.data[pubsubTopic] == nil { if c.data[pubsubTopic] == nil {
c.data[pubsubTopic] = make(map[string][]*pb.WakuMessage) c.data[pubsubTopic] = make(map[string][]*RestWakuMessage)
} }
for _, topic := range contentTopics { for _, topic := range contentTopics {
if c.data[pubsubTopic][topic] == nil { if c.data[pubsubTopic][topic] == nil {
c.data[pubsubTopic][topic] = []*pb.WakuMessage{} c.data[pubsubTopic][topic] = []*RestWakuMessage{}
} }
} }
} }
@ -60,10 +62,16 @@ func (c *filterCache) addMessage(envelope *protocol.Envelope) {
c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:] c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:]
} }
c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], envelope.Message()) message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
c.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
return
}
c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], message)
} }
func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*pb.WakuMessage, error) { func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -71,6 +79,6 @@ func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*p
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic) return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic)
} }
msgs := c.data[pubsubTopic][contentTopic] msgs := c.data[pubsubTopic][contentTopic]
c.data[pubsubTopic][contentTopic] = []*pb.WakuMessage{} c.data[pubsubTopic][contentTopic] = []*RestWakuMessage{}
return msgs, nil return msgs, nil
} }

View File

@ -8,7 +8,6 @@ import (
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -38,34 +37,39 @@ func (msg lightpushRequest) Check() error {
} }
type lightpushRequest struct { type lightpushRequest struct {
PubSubTopic string `json:"pubsubTopic"` PubSubTopic string `json:"pubsubTopic"`
Message *pb.WakuMessage `json:"message"` Message *RestWakuMessage `json:"message"`
} }
// handled error codes are 200, 400, 500, 503 // handled error codes are 200, 400, 500, 503
func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Request) { func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Request) {
msg := &lightpushRequest{} request := &lightpushRequest{}
decoder := json.NewDecoder(req.Body) decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(msg); err != nil { if err := decoder.Decode(request); err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
defer req.Body.Close() defer req.Body.Close()
if err := msg.Check(); err != nil { if err := request.Check(); err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
_, err = w.Write([]byte(err.Error())) _, err = w.Write([]byte(err.Error()))
serv.log.Error("writing response", zap.Error(err)) serv.log.Error("writing response", zap.Error(err))
return return
} }
//
if serv.node.Lightpush() == nil { if serv.node.Lightpush() == nil {
w.WriteHeader(http.StatusServiceUnavailable) w.WriteHeader(http.StatusServiceUnavailable)
return return
} }
_, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic)) message, err := request.Message.ToProto()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
_, err = serv.node.Lightpush().Publish(req.Context(), message, lightpush.WithPubSubTopic(request.PubSubTopic))
if err != nil { if err != nil {
w.WriteHeader(http.StatusServiceUnavailable) w.WriteHeader(http.StatusServiceUnavailable)
_, err = w.Write([]byte(err.Error())) _, err = w.Write([]byte(err.Error()))

View File

@ -14,7 +14,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/node"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
) )
@ -46,7 +45,7 @@ func TestLightpushMessagev1(t *testing.T) {
msg := lightpushRequest{ msg := lightpushRequest{
PubSubTopic: pubSubTopic, PubSubTopic: pubSubTopic,
Message: &pb.WakuMessage{ Message: &RestWakuMessage{
Payload: []byte{1, 2, 3}, Payload: []byte{1, 2, 3},
ContentTopic: "abc", ContentTopic: "abc",
Timestamp: utils.GetUnixEpoch(), Timestamp: utils.GetUnixEpoch(),

View File

@ -0,0 +1,46 @@
package rest
import (
"errors"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
type RestWakuMessage struct {
Payload server.Base64URLByte `json:"payload"`
ContentTopic string `json:"contentTopic"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
}
func (r *RestWakuMessage) FromProto(input *pb.WakuMessage) error {
if err := input.Validate(); err != nil {
return err
}
r.Payload = input.Payload
r.ContentTopic = input.ContentTopic
r.Timestamp = input.Timestamp
r.Version = input.Version
r.Meta = input.Meta
return nil
}
func (r *RestWakuMessage) ToProto() (*pb.WakuMessage, error) {
if r == nil {
return nil, errors.New("wakumessage is missing")
}
msg := &pb.WakuMessage{
Payload: r.Payload,
ContentTopic: r.ContentTopic,
Version: r.Version,
Timestamp: r.Timestamp,
Meta: r.Meta,
}
return msg, nil
}

View File

@ -9,7 +9,6 @@ import (
"github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -124,9 +123,9 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
r.log.Error("writing response", zap.Error(err)) r.log.Error("writing response", zap.Error(err))
return return
} }
var response []*pb.WakuMessage var response []*RestWakuMessage
select { select {
case msg, open := <-sub.Ch: case envelope, open := <-sub.Ch:
if !open { if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic))
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
@ -136,7 +135,14 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
} }
return return
} }
response = append(response, msg.Message())
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
}
default: default:
break break
} }
@ -150,9 +156,9 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
return return
} }
var message *pb.WakuMessage var restMessage *RestWakuMessage
decoder := json.NewDecoder(req.Body) decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&message); err != nil { if err := decoder.Decode(&restMessage); err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
@ -162,12 +168,18 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
topic = relay.DefaultWakuTopic topic = relay.DefaultWakuTopic
} }
message, err := restMessage.ToProto()
if err != nil {
writeErrOrResponse(w, err, nil)
return
}
if err := server.AppendRLNProof(r.node, message); err != nil { if err := server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil) writeErrOrResponse(w, err, nil)
return return
} }
_, err := r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1))) _, err = r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1)))
if err != nil { if err != nil {
r.log.Error("publishing message", zap.Error(err)) r.log.Error("publishing message", zap.Error(err))
} }
@ -227,10 +239,15 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques
r.log.Error("writing response", zap.Error(err)) r.log.Error("writing response", zap.Error(err))
return return
} }
var response []*pb.WakuMessage var response []*RestWakuMessage
select { select {
case msg := <-sub.Ch: case envelope := <-sub.Ch:
response = append(response, msg.Message()) message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
}
default: default:
break break
} }
@ -240,15 +257,21 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques
func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) { func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) {
var message *pb.WakuMessage var restMessage *RestWakuMessage
decoder := json.NewDecoder(req.Body) decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&message); err != nil { if err := decoder.Decode(&restMessage); err != nil {
r.log.Error("decoding message failure", zap.Error(err)) r.log.Error("decoding message failure", zap.Error(err))
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
defer req.Body.Close() defer req.Body.Close()
var err error
message, err := restMessage.ToProto()
if err != nil {
writeErrOrResponse(w, err, nil)
return
}
if err = server.AppendRLNProof(r.node, message); err != nil { if err = server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil) writeErrOrResponse(w, err, nil)
return return

View File

@ -36,7 +36,7 @@ func TestPostV1Message(t *testing.T) {
router := chi.NewRouter() router := chi.NewRouter()
_ = makeRelayService(t, router) _ = makeRelayService(t, router)
msg := &pb.WakuMessage{ msg := &RestWakuMessage{
Payload: []byte{1, 2, 3}, Payload: []byte{1, 2, 3},
ContentTopic: "abc", ContentTopic: "abc",
Timestamp: utils.GetUnixEpoch(), Timestamp: utils.GetUnixEpoch(),
@ -127,7 +127,7 @@ func TestRelayGetV1Messages(t *testing.T) {
// Wait for the subscription to be started // Wait for the subscription to be started
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
msg := &pb.WakuMessage{ msg := &RestWakuMessage{
Payload: []byte{1, 2, 3}, Payload: []byte{1, 2, 3},
ContentTopic: "test", ContentTopic: "test",
Timestamp: utils.GetUnixEpoch(), Timestamp: utils.GetUnixEpoch(),
@ -164,7 +164,7 @@ func TestPostAutoV1Message(t *testing.T) {
router := chi.NewRouter() router := chi.NewRouter()
_ = makeRelayService(t, router) _ = makeRelayService(t, router)
msg := &pb.WakuMessage{ msg := &RestWakuMessage{
Payload: []byte{1, 2, 3}, Payload: []byte{1, 2, 3},
ContentTopic: "/toychat/1/huilong/proto", ContentTopic: "/toychat/1/huilong/proto",
Timestamp: utils.GetUnixEpoch(), Timestamp: utils.GetUnixEpoch(),
@ -262,7 +262,7 @@ func TestRelayGetV1AutoMessages(t *testing.T) {
// Wait for the subscription to be started // Wait for the subscription to be started
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
msg := &pb.WakuMessage{ msg := &RestWakuMessage{
Payload: []byte{1, 2, 3}, Payload: []byte{1, 2, 3},
ContentTopic: cTopic1, ContentTopic: cTopic1,
Timestamp: utils.GetUnixEpoch(), Timestamp: utils.GetUnixEpoch(),

View File

@ -30,18 +30,18 @@ type StoreResponse struct {
} }
type HistoryCursor struct { type HistoryCursor struct {
PubsubTopic string `json:"pubsub_topic"` PubsubTopic string `json:"pubsubTopic"`
SenderTime string `json:"sender_time"` SenderTime string `json:"senderTime"`
StoreTime string `json:"store_time"` StoreTime string `json:"storeTime"`
Digest []byte `json:"digest"` Digest []byte `json:"digest"`
} }
type StoreWakuMessage struct { type StoreWakuMessage struct {
Payload []byte `json:"payload"` Payload []byte `json:"payload"`
ContentTopic string `json:"content_topic"` ContentTopic string `json:"contentTopic"`
Version uint32 `json:"version"` Version *uint32 `json:"version,omitempty"`
Timestamp int64 `json:"timestamp"` Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta"` Meta []byte `json:"meta,omitempty"`
} }
const routeStoreMessagesV1 = "/store/v1/messages" const routeStoreMessagesV1 = "/store/v1/messages"
@ -180,8 +180,8 @@ func toStoreResponse(result *store.Result) StoreResponse {
response.Messages = append(response.Messages, StoreWakuMessage{ response.Messages = append(response.Messages, StoreWakuMessage{
Payload: m.Payload, Payload: m.Payload,
ContentTopic: m.ContentTopic, ContentTopic: m.ContentTopic,
Version: m.GetVersion(), Version: m.Version,
Timestamp: m.GetTimestamp(), Timestamp: m.Timestamp,
Meta: m.Meta, Meta: m.Meta,
}) })
} }

View File

@ -3,6 +3,7 @@ package rpc
import ( import (
"errors" "errors"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
rlnpb "github.com/waku-org/go-waku/waku/v2/protocol/rln/pb" rlnpb "github.com/waku-org/go-waku/waku/v2/protocol/rln/pb"
@ -20,12 +21,12 @@ type RateLimitProof struct {
} }
type RPCWakuMessage struct { type RPCWakuMessage struct {
Payload Base64URLByte `json:"payload,omitempty"` Payload server.Base64URLByte `json:"payload,omitempty"`
ContentTopic string `json:"contentTopic,omitempty"` ContentTopic string `json:"contentTopic,omitempty"`
Version uint32 `json:"version"` Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp,omitempty"` Timestamp int64 `json:"timestamp,omitempty"`
RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"` RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"`
Ephemeral bool `json:"ephemeral,omitempty"` Ephemeral bool `json:"ephemeral,omitempty"`
} }
func ProtoToRPC(input *pb.WakuMessage) (*RPCWakuMessage, error) { func ProtoToRPC(input *pb.WakuMessage) (*RPCWakuMessage, error) {

View File

@ -1,6 +1,9 @@
package server package server
import ( import (
"encoding/base64"
"strings"
"github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
@ -17,3 +20,31 @@ func IsWakuProtocol(protocol protocol.ID) bool {
protocol == lightpush.LightPushID_v20beta1 || protocol == lightpush.LightPushID_v20beta1 ||
protocol == store.StoreID_v20beta4 protocol == store.StoreID_v20beta4
} }
type Base64URLByte []byte
// UnmarshalText is used by json.Unmarshal to decode both url-safe and standard
// base64 encoded strings with and without padding
func (h *Base64URLByte) UnmarshalText(b []byte) error {
inputValue := ""
if b != nil {
inputValue = string(b)
}
enc := base64.StdEncoding
if strings.ContainsAny(inputValue, "-_") {
enc = base64.URLEncoding
}
if len(inputValue)%4 != 0 {
enc = enc.WithPadding(base64.NoPadding)
}
decodedBytes, err := enc.DecodeString(inputValue)
if err != nil {
return err
}
*h = decodedBytes
return nil
}