feat(rest-filterv2): get message (#856)

* feat: add getMessage endpoint

* test: getMessage filter v2
This commit is contained in:
harsh jain 2023-11-04 14:24:20 +07:00 committed by GitHub
parent a0bc53c679
commit 532a04013f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 477 additions and 27 deletions

View File

@ -560,6 +560,13 @@ var (
Destination: &options.RESTServer.RelayCacheCapacity,
EnvVars: []string{"WAKUNODE2_REST_RELAY_CACHE_CAPACITY"},
})
RESTFilterCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{
Name: "rest-filter-cache-capacity",
Value: 30,
Usage: "Capacity of the Filter REST API message cache",
Destination: &options.RESTServer.FilterCacheCapacity,
EnvVars: []string{"WAKUNODE2_REST_FILTER_CACHE_CAPACITY"},
})
RESTAdmin = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "rest-admin",
Value: false,

View File

@ -99,6 +99,7 @@ func main() {
RESTAddress,
RESTPort,
RESTRelayCacheCapacity,
RESTFilterCacheCapacity,
RESTAdmin,
PProf,
}

View File

@ -444,7 +444,7 @@ func Execute(options NodeOptions) error {
var restServer *rest.WakuRest
if options.RESTServer.Enable {
wg.Add(1)
restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, logger)
restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, options.RESTServer.FilterCacheCapacity, logger)
restServer.Start(ctx, &wg)
}

View File

@ -118,6 +118,7 @@ type RESTServerOptions struct {
Address string
Admin bool
RelayCacheCapacity int
FilterCacheCapacity int
}
// WSOptions are settings used for enabling websockets and secure websockets

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"github.com/go-chi/chi/v5"
@ -39,25 +40,56 @@ func (r filterRequestId) MarshalJSON() ([]byte, error) {
const filterv2Ping = "/filter/v2/subscriptions/{requestId}"
const filterv2Subscribe = "/filter/v2/subscriptions"
const filterv2SubscribeAll = "/filter/v2/subscriptions/all"
const filterv2MessagesByContentTopic = "/filter/v2/messages/{contentTopic}"
const filterv2MessagesByPubsubTopic = "/filter/v2/messages/{pubsubTopic}/{contentTopic}"
// FilterService represents the REST service for Filter client
type FilterService struct {
node *node.WakuNode
cancel context.CancelFunc
log *zap.Logger
cache *filterCache
runner *runnerService
}
// Start starts the RelayService
func (s *FilterService) Start(ctx context.Context) {
for _, sub := range s.node.FilterLightnode().Subscriptions() {
s.cache.subscribe(sub.ContentFilter)
}
ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel
s.runner.Start(ctx)
}
// Stop stops the RelayService
func (r *FilterService) Stop() {
if r.cancel == nil {
return
}
r.cancel()
}
// NewFilterService returns an instance of FilterService
func NewFilterService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *FilterService {
func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *FilterService {
s := &FilterService{
node: node,
log: log.Named("filter"),
cache: newFilterCache(cacheCapacity),
}
m.Get(filterv2Ping, s.ping)
m.Post(filterv2Subscribe, s.subscribe)
m.Delete(filterv2Subscribe, s.unsubscribe)
m.Delete(filterv2SubscribeAll, s.unsubscribeAll)
m.Get(filterv2MessagesByContentTopic, s.getMessagesByContentTopic)
m.Get(filterv2MessagesByPubsubTopic, s.getMessagesByPubsubTopic)
s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage)
return s
}
@ -123,9 +155,10 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
return
}
contentFilter := protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...)
//
subscriptions, err := s.node.FilterLightnode().Subscribe(req.Context(),
protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...),
contentFilter,
filter.WithRequestID(message.RequestId))
// on partial subscribe failure
@ -148,6 +181,7 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
}
// on success
s.cache.subscribe(contentFilter)
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
StatusDesc: http.StatusText(http.StatusOK),
@ -170,10 +204,11 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
return
}
contentFilter := protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...)
// unsubscribe on filter
result, err := s.node.FilterLightnode().Unsubscribe(
req.Context(),
protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...),
contentFilter,
filter.WithRequestID(message.RequestId),
filter.WithPeer(peerId),
)
@ -188,6 +223,11 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
}
// on success
for cTopic := range contentFilter.ContentTopics {
if !s.node.FilterLightnode().IsListening(contentFilter.PubsubTopic, cTopic) {
s.cache.unsubscribe(contentFilter.PubsubTopic, cTopic)
}
}
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
StatusDesc: s.unsubscribeGetMessage(result),
@ -285,3 +325,66 @@ func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId []byte
}
return peerId
}
func (s *FilterService) getMessagesByContentTopic(w http.ResponseWriter, req *http.Request) {
contentTopic := s.topicFromPath(w, req, "contentTopic")
if contentTopic == "" {
return
}
pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
s.writeGetMessageErr(w, fmt.Errorf("bad content topic"), http.StatusBadRequest)
return
}
s.getMessages(w, req, pubsubTopic, contentTopic)
}
func (s *FilterService) getMessagesByPubsubTopic(w http.ResponseWriter, req *http.Request) {
contentTopic := s.topicFromPath(w, req, "contentTopic")
if contentTopic == "" {
return
}
pubsubTopic := s.topicFromPath(w, req, "pubsubTopic")
if pubsubTopic == "" {
return
}
s.getMessages(w, req, pubsubTopic, contentTopic)
}
// 400 on invalid request
// 500 on failed subscription
// 200 on all successful unsubscribe
// unsubscribe all subscriptions for a given peer
func (s *FilterService) getMessages(w http.ResponseWriter, req *http.Request, pubsubTopic, contentTopic string) {
msgs, err := s.cache.getMessages(pubsubTopic, contentTopic)
if err != nil {
s.writeGetMessageErr(w, err, http.StatusNotFound)
return
}
writeResponse(w, msgs, http.StatusOK)
}
func (s *FilterService) topicFromPath(w http.ResponseWriter, req *http.Request, field string) string {
cTopic := chi.URLParam(req, field)
if cTopic == "" {
errMissing := fmt.Errorf("missing %s", field)
s.writeGetMessageErr(w, errMissing, http.StatusBadRequest)
return ""
}
cTopic, err := url.QueryUnescape(cTopic)
if err != nil {
errInvalid := fmt.Errorf("invalid %s format", field)
s.writeGetMessageErr(w, errInvalid, http.StatusBadRequest)
return ""
}
return cTopic
}
func (s *FilterService) writeGetMessageErr(w http.ResponseWriter, err error, code int) {
// write status before the body
w.WriteHeader(code)
s.log.Error("get message", zap.Error(err))
if _, err := w.Write([]byte(err.Error())); err != nil {
s.log.Error("writing response", zap.Error(err))
}
}

View File

@ -174,6 +174,92 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/messages/{contentTopic}:
get: # get_waku_v2_filter_v2_messages
summary: Get the latest messages on the polled content topic
description: Get a list of messages that were received on a subscribed content topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- filter
parameters:
- in: path
name: contentTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: Content topic of message
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterGetMessagesResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'404':
description: Not found.
content:
text/plain:
schema:
type: string
'5XX':
description: Unexpected error.
content:
text/plain:
schema:
type: string
/filter/v2/messages/{pubsubTopic}/{contentTopic}:
get: # get_waku_v2_filter_v2_messages
summary: Get the latest messages on the polled pubsub/content topic pair
description: Get a list of messages that were received on a subscribed content topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- filter
parameters:
- in: path
name: contentTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: Content topic of message
- in: path
name: pubsubTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: pubsub topic of message
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterGetMessagesResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'404':
description: Not found.
content:
text/plain:
schema:
type: string
'5XX':
description: Unexpected error.
content:
text/plain:
schema:
type: string
components:
PubSubTopic:
@ -229,3 +315,23 @@ components:
type: string
required:
- requestId
FilterGetMessagesResponse:
type: array
items:
$ref: '#/components/schemas/FilterWakuMessage'
FilterWakuMessage:
type: object
properties:
payload:
type: string
format: byte
contentTopic:
$ref: '#/components/schemas/ContentTopic'
version:
type: number
timestamp:
type: number
required:
- payload

View File

@ -0,0 +1,76 @@
package rest
import (
"fmt"
"sync"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
type filterCache struct {
capacity int
mu sync.RWMutex
data map[string]map[string][]*pb.WakuMessage
}
func newFilterCache(capacity int) *filterCache {
return &filterCache{
capacity: capacity,
data: make(map[string]map[string][]*pb.WakuMessage),
}
}
func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) {
c.mu.Lock()
defer c.mu.Unlock()
pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter)
for pubsubTopic, contentTopics := range pubSubTopicMap {
if c.data[pubsubTopic] == nil {
c.data[pubsubTopic] = make(map[string][]*pb.WakuMessage)
}
for _, topic := range contentTopics {
if c.data[pubsubTopic][topic] == nil {
c.data[pubsubTopic][topic] = []*pb.WakuMessage{}
}
}
}
}
func (c *filterCache) unsubscribe(pubsubTopic string, contentTopic string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.data[pubsubTopic], contentTopic)
}
func (c *filterCache) addMessage(envelope *protocol.Envelope) {
c.mu.Lock()
defer c.mu.Unlock()
pubsubTopic := envelope.PubsubTopic()
contentTopic := envelope.Message().ContentTopic
if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil {
return
}
// Keep a specific max number of message per topic
if len(c.data[pubsubTopic][contentTopic]) >= c.capacity {
c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:]
}
c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], envelope.Message())
}
func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*pb.WakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()
if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil {
return nil, fmt.Errorf("Not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic)
}
msgs := c.data[pubsubTopic][contentTopic]
c.data[pubsubTopic][contentTopic] = []*pb.WakuMessage{}
return msgs, nil
}

View File

@ -7,8 +7,10 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peerstore"
@ -18,6 +20,7 @@ import (
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -32,7 +35,7 @@ func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
}
// node2 connects to node1
func twoFilterConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) {
func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) {
node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter
@ -40,10 +43,8 @@ func twoFilterConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode,
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1)
require.NoError(t, err)
if pubSubTopic != "" {
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic})
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), pubSubTopics)
require.NoError(t, err)
}
return node1, node2
}
@ -51,14 +52,14 @@ func twoFilterConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode,
// test 400, 404 status code for ping rest endpoint
// both requests are not successful
func TestFilterPingFailure(t *testing.T) {
node1, node2 := twoFilterConnectedNodes(t, "")
node1, node2 := twoFilterConnectedNodes(t)
defer func() {
node1.Stop()
node2.Stop()
}()
router := chi.NewRouter()
_ = NewFilterService(node2, router, utils.Logger())
_ = NewFilterService(node2, router, 0, utils.Logger())
// with malformed requestId
rr := httptest.NewRecorder()
@ -97,7 +98,7 @@ func TestFilterSubscribeAndPing(t *testing.T) {
}()
router := chi.NewRouter()
_ = NewFilterService(node2, router, utils.Logger())
_ = NewFilterService(node2, router, 0, utils.Logger())
// create subscription to peer
rr := httptest.NewRecorder()
@ -140,7 +141,7 @@ func TestFilterSubscribeAndUnsubscribe(t *testing.T) {
}()
router := chi.NewRouter()
_ = NewFilterService(node2, router, utils.Logger())
_ = NewFilterService(node2, router, 0, utils.Logger())
// create subscription to peer
rr := httptest.NewRecorder()
@ -191,7 +192,7 @@ func TestFilterAllUnsubscribe(t *testing.T) {
}()
router := chi.NewRouter()
_ = NewFilterService(node2, router, utils.Logger())
_ = NewFilterService(node2, router, 0, utils.Logger())
// create 2 different subscription to peer
for _, ct := range []string{contentTopics1, contentTopics2} {
@ -246,8 +247,150 @@ func getFilterResponse(t *testing.T, body *bytes.Buffer) filterSubscriptionRespo
require.NoError(t, err)
return resp
}
func getMessageResponse(t *testing.T, body *bytes.Buffer) []*pb.WakuMessage {
resp := []*pb.WakuMessage{}
err := json.Unmarshal(body.Bytes(), &resp)
require.NoError(t, err)
return resp
}
func toString(t *testing.T, data interface{}) string {
bytes, err := json.Marshal(data)
require.NoError(t, err)
return string(bytes)
}
func TestFilterGetMessages(t *testing.T) {
pubsubTopic := "/waku/2/test/proto"
contentTopic := "/waku/2/app/1"
// get nodes add connect them
generatedPubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic)
require.NoError(t, err)
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic, generatedPubsubTopic)
defer func() {
node1.Stop()
node2.Stop()
}()
// set router and start filter service
router := chi.NewRouter()
service := NewFilterService(node2, router, 2, utils.Logger())
go service.Start(context.Background())
defer service.Stop()
{ // create subscription so that messages are cached
for _, pubsubTopic := range []string{"", pubsubTopic} {
requestId := protocol.GenerateRequestID()
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestId: requestId,
PubsubTopic: pubsubTopic,
ContentFilters: []string{contentTopic},
}))
req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestId: requestId,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
}
}
// submit messages
messageByContentTopic := []*protocol.Envelope{
genMessage("", contentTopic),
genMessage("", contentTopic),
genMessage("", contentTopic),
}
messageByPubsubTopic := []*protocol.Envelope{
genMessage(pubsubTopic, contentTopic),
}
for _, envelope := range append(messageByContentTopic, messageByPubsubTopic...) {
node2.Broadcaster().Submit(envelope)
}
time.Sleep(1 * time.Second)
{ // with malformed contentTopic
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape("/waku/2/wrongtopic")),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "bad content topic", rr.Body.String())
}
{ // with check if the cache is working properly
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
checkJSON(t, toMessage(messageByContentTopic[1:]), getMessageResponse(t, rr.Body))
}
{ // check if pubsubTopic is present in the url
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages//%s", url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "missing pubsubTopic", rr.Body.String())
}
{ // check messages by pubsub/contentTopic pair
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
checkJSON(t, toMessage(messageByPubsubTopic), getMessageResponse(t, rr.Body))
}
{ // check if pubsubTopic/contentTOpic is subscribed or not.
rr := httptest.NewRecorder()
notSubscibredPubsubTopic := "/waku/2/test2/proto"
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(notSubscibredPubsubTopic), url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
require.Equal(t,
fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
rr.Body.String(),
)
}
}
func toMessage(envs []*protocol.Envelope) []*pb.WakuMessage {
msgs := make([]*pb.WakuMessage, len(envs))
for i, env := range envs {
msgs[i] = env.Message()
}
return msgs
}
func genMessage(pubsubTopic, contentTopic string) *protocol.Envelope {
if pubsubTopic == "" {
pubsubTopic, _ = protocol.GetPubSubTopicFromContentTopic(contentTopic)
}
return protocol.NewEnvelope(
&pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: contentTopic,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
},
0,
pubsubTopic,
)
}

View File

@ -19,9 +19,10 @@ type WakuRest struct {
log *zap.Logger
relayService *RelayService
filterService *FilterService
}
func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, enableAdmin bool, relayCacheCapacity int, log *zap.Logger) *WakuRest {
func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, enableAdmin bool, relayCacheCapacity, filterCacheCapacity int, log *zap.Logger) *WakuRest {
wrpc := new(WakuRest)
wrpc.log = log.Named("rest")
@ -61,7 +62,11 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
}
if node.FilterLightnode() != nil {
_ = NewFilterService(node, mux, log)
filterService := NewFilterService(node, mux, filterCacheCapacity, log)
server.RegisterOnShutdown(func() {
filterService.Stop()
})
wrpc.filterService = filterService
}
return wrpc
@ -73,6 +78,9 @@ func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) {
if r.node.Relay() != nil {
go r.relayService.Start(ctx)
}
if r.node.FilterLightnode() != nil {
go r.filterService.Start(ctx)
}
go func() {
_ = r.server.ListenAndServe()

View File

@ -13,7 +13,7 @@ func TestWakuRest(t *testing.T) {
n, err := node.New(options)
require.NoError(t, err)
rpc := NewWakuRest(n, "127.0.0.1", 8080, false, false, 10, utils.Logger())
rpc := NewWakuRest(n, "127.0.0.1", 8080, false, false, 10, 0, utils.Logger())
require.NotNil(t, rpc.server)
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
}

View File

@ -482,6 +482,11 @@ func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetai
return subs
}
func (wf *WakuFilterLightNode) IsListening(pubsubTopic, contentTopic string) bool {
return wf.subscriptions.IsListening(pubsubTopic, contentTopic)
}
// UnsubscribeWithSubscription is used to close a particular subscription
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
// server unsubscribe is also performed