diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index dcbca197..860d3eda 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -560,6 +560,13 @@ var ( Destination: &options.RESTServer.RelayCacheCapacity, EnvVars: []string{"WAKUNODE2_REST_RELAY_CACHE_CAPACITY"}, }) + RESTFilterCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{ + Name: "rest-filter-cache-capacity", + Value: 30, + Usage: "Capacity of the Filter REST API message cache", + Destination: &options.RESTServer.FilterCacheCapacity, + EnvVars: []string{"WAKUNODE2_REST_FILTER_CACHE_CAPACITY"}, + }) RESTAdmin = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "rest-admin", Value: false, diff --git a/cmd/waku/main.go b/cmd/waku/main.go index c590f38d..3e74632a 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -99,6 +99,7 @@ func main() { RESTAddress, RESTPort, RESTRelayCacheCapacity, + RESTFilterCacheCapacity, RESTAdmin, PProf, } diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 3d5113ef..2129a1e5 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -444,7 +444,7 @@ func Execute(options NodeOptions) error { var restServer *rest.WakuRest if options.RESTServer.Enable { wg.Add(1) - restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, logger) + restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, options.RESTServer.FilterCacheCapacity, logger) restServer.Start(ctx, &wg) } diff --git a/cmd/waku/options.go b/cmd/waku/options.go index 611f64e4..8fd48eb8 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -113,11 +113,12 @@ type RPCServerOptions struct { // RESTServerOptions are settings used to start a rest http server type RESTServerOptions struct { - Enable bool - Port int - Address string - Admin bool - RelayCacheCapacity int + Enable bool + Port int + Address string + Admin bool + RelayCacheCapacity int + FilterCacheCapacity int } // WSOptions are settings used for enabling websockets and secure websockets diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index 2b2c9d31..30666c6f 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "strings" "github.com/go-chi/chi/v5" @@ -39,25 +40,56 @@ func (r filterRequestId) MarshalJSON() ([]byte, error) { const filterv2Ping = "/filter/v2/subscriptions/{requestId}" const filterv2Subscribe = "/filter/v2/subscriptions" const filterv2SubscribeAll = "/filter/v2/subscriptions/all" +const filterv2MessagesByContentTopic = "/filter/v2/messages/{contentTopic}" +const filterv2MessagesByPubsubTopic = "/filter/v2/messages/{pubsubTopic}/{contentTopic}" // FilterService represents the REST service for Filter client type FilterService struct { - node *node.WakuNode + node *node.WakuNode + cancel context.CancelFunc log *zap.Logger + + cache *filterCache + runner *runnerService +} + +// Start starts the RelayService +func (s *FilterService) Start(ctx context.Context) { + + for _, sub := range s.node.FilterLightnode().Subscriptions() { + s.cache.subscribe(sub.ContentFilter) + } + + ctx, cancel := context.WithCancel(ctx) + s.cancel = cancel + s.runner.Start(ctx) +} + +// Stop stops the RelayService +func (r *FilterService) Stop() { + if r.cancel == nil { + return + } + r.cancel() } // NewFilterService returns an instance of FilterService -func NewFilterService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *FilterService { +func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *FilterService { s := &FilterService{ - node: node, - log: log.Named("filter"), + node: node, + log: log.Named("filter"), + cache: newFilterCache(cacheCapacity), } m.Get(filterv2Ping, s.ping) m.Post(filterv2Subscribe, s.subscribe) m.Delete(filterv2Subscribe, s.unsubscribe) m.Delete(filterv2SubscribeAll, s.unsubscribeAll) + m.Get(filterv2MessagesByContentTopic, s.getMessagesByContentTopic) + m.Get(filterv2MessagesByPubsubTopic, s.getMessagesByPubsubTopic) + + s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage) return s } @@ -123,9 +155,10 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) { return } + contentFilter := protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...) // subscriptions, err := s.node.FilterLightnode().Subscribe(req.Context(), - protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...), + contentFilter, filter.WithRequestID(message.RequestId)) // on partial subscribe failure @@ -148,6 +181,7 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) { } // on success + s.cache.subscribe(contentFilter) writeResponse(w, filterSubscriptionResponse{ RequestId: message.RequestId, StatusDesc: http.StatusText(http.StatusOK), @@ -170,10 +204,11 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { return } + contentFilter := protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...) // unsubscribe on filter result, err := s.node.FilterLightnode().Unsubscribe( req.Context(), - protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...), + contentFilter, filter.WithRequestID(message.RequestId), filter.WithPeer(peerId), ) @@ -188,6 +223,11 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { } // on success + for cTopic := range contentFilter.ContentTopics { + if !s.node.FilterLightnode().IsListening(contentFilter.PubsubTopic, cTopic) { + s.cache.unsubscribe(contentFilter.PubsubTopic, cTopic) + } + } writeResponse(w, filterSubscriptionResponse{ RequestId: message.RequestId, StatusDesc: s.unsubscribeGetMessage(result), @@ -285,3 +325,66 @@ func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId []byte } return peerId } + +func (s *FilterService) getMessagesByContentTopic(w http.ResponseWriter, req *http.Request) { + contentTopic := s.topicFromPath(w, req, "contentTopic") + if contentTopic == "" { + return + } + pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic) + if err != nil { + s.writeGetMessageErr(w, fmt.Errorf("bad content topic"), http.StatusBadRequest) + return + } + s.getMessages(w, req, pubsubTopic, contentTopic) +} + +func (s *FilterService) getMessagesByPubsubTopic(w http.ResponseWriter, req *http.Request) { + contentTopic := s.topicFromPath(w, req, "contentTopic") + if contentTopic == "" { + return + } + pubsubTopic := s.topicFromPath(w, req, "pubsubTopic") + if pubsubTopic == "" { + return + } + s.getMessages(w, req, pubsubTopic, contentTopic) +} + +// 400 on invalid request +// 500 on failed subscription +// 200 on all successful unsubscribe +// unsubscribe all subscriptions for a given peer +func (s *FilterService) getMessages(w http.ResponseWriter, req *http.Request, pubsubTopic, contentTopic string) { + msgs, err := s.cache.getMessages(pubsubTopic, contentTopic) + if err != nil { + s.writeGetMessageErr(w, err, http.StatusNotFound) + return + } + writeResponse(w, msgs, http.StatusOK) +} + +func (s *FilterService) topicFromPath(w http.ResponseWriter, req *http.Request, field string) string { + cTopic := chi.URLParam(req, field) + if cTopic == "" { + errMissing := fmt.Errorf("missing %s", field) + s.writeGetMessageErr(w, errMissing, http.StatusBadRequest) + return "" + } + cTopic, err := url.QueryUnescape(cTopic) + if err != nil { + errInvalid := fmt.Errorf("invalid %s format", field) + s.writeGetMessageErr(w, errInvalid, http.StatusBadRequest) + return "" + } + return cTopic +} + +func (s *FilterService) writeGetMessageErr(w http.ResponseWriter, err error, code int) { + // write status before the body + w.WriteHeader(code) + s.log.Error("get message", zap.Error(err)) + if _, err := w.Write([]byte(err.Error())); err != nil { + s.log.Error("writing response", zap.Error(err)) + } +} diff --git a/cmd/waku/server/rest/filter_api.yaml b/cmd/waku/server/rest/filter_api.yaml index bdeef767..5f5478b5 100644 --- a/cmd/waku/server/rest/filter_api.yaml +++ b/cmd/waku/server/rest/filter_api.yaml @@ -174,6 +174,92 @@ paths: application/json: schema: $ref: '#/components/schemas/FilterSubscriptionResponse' + /filter/v2/messages/{contentTopic}: + get: # get_waku_v2_filter_v2_messages + summary: Get the latest messages on the polled content topic + description: Get a list of messages that were received on a subscribed content topic after the last time this method was called. + operationId: getMessagesByTopic + tags: + - filter + parameters: + - in: path + name: contentTopic # Note the name is the same as in the path + required: true + schema: + type: string + description: Content topic of message + responses: + '200': + description: The latest messages on the polled topic. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterGetMessagesResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + text/plain: + schema: + type: string + '404': + description: Not found. + content: + text/plain: + schema: + type: string + '5XX': + description: Unexpected error. + content: + text/plain: + schema: + type: string + /filter/v2/messages/{pubsubTopic}/{contentTopic}: + get: # get_waku_v2_filter_v2_messages + summary: Get the latest messages on the polled pubsub/content topic pair + description: Get a list of messages that were received on a subscribed content topic after the last time this method was called. + operationId: getMessagesByTopic + tags: + - filter + parameters: + - in: path + name: contentTopic # Note the name is the same as in the path + required: true + schema: + type: string + description: Content topic of message + - in: path + name: pubsubTopic # Note the name is the same as in the path + required: true + schema: + type: string + description: pubsub topic of message + responses: + '200': + description: The latest messages on the polled topic. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterGetMessagesResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + text/plain: + schema: + type: string + '404': + description: Not found. + content: + text/plain: + schema: + type: string + '5XX': + description: Unexpected error. + content: + text/plain: + schema: + type: string components: PubSubTopic: @@ -228,4 +314,24 @@ components: requestId: type: string required: - - requestId \ No newline at end of file + - requestId + + FilterGetMessagesResponse: + type: array + items: + $ref: '#/components/schemas/FilterWakuMessage' + + FilterWakuMessage: + type: object + properties: + payload: + type: string + format: byte + contentTopic: + $ref: '#/components/schemas/ContentTopic' + version: + type: number + timestamp: + type: number + required: + - payload \ No newline at end of file diff --git a/cmd/waku/server/rest/filter_cache.go b/cmd/waku/server/rest/filter_cache.go new file mode 100644 index 00000000..49fb57bf --- /dev/null +++ b/cmd/waku/server/rest/filter_cache.go @@ -0,0 +1,76 @@ +package rest + +import ( + "fmt" + "sync" + + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +type filterCache struct { + capacity int + mu sync.RWMutex + data map[string]map[string][]*pb.WakuMessage +} + +func newFilterCache(capacity int) *filterCache { + return &filterCache{ + capacity: capacity, + data: make(map[string]map[string][]*pb.WakuMessage), + } +} + +func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) { + c.mu.Lock() + defer c.mu.Unlock() + + pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter) + for pubsubTopic, contentTopics := range pubSubTopicMap { + if c.data[pubsubTopic] == nil { + c.data[pubsubTopic] = make(map[string][]*pb.WakuMessage) + } + for _, topic := range contentTopics { + if c.data[pubsubTopic][topic] == nil { + c.data[pubsubTopic][topic] = []*pb.WakuMessage{} + } + } + } +} + +func (c *filterCache) unsubscribe(pubsubTopic string, contentTopic string) { + c.mu.Lock() + defer c.mu.Unlock() + + delete(c.data[pubsubTopic], contentTopic) +} + +func (c *filterCache) addMessage(envelope *protocol.Envelope) { + c.mu.Lock() + defer c.mu.Unlock() + + pubsubTopic := envelope.PubsubTopic() + contentTopic := envelope.Message().ContentTopic + if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil { + return + } + + // Keep a specific max number of message per topic + if len(c.data[pubsubTopic][contentTopic]) >= c.capacity { + c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:] + } + + c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], envelope.Message()) +} + +func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*pb.WakuMessage, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil { + return nil, fmt.Errorf("Not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic) + } + msgs := c.data[pubsubTopic][contentTopic] + c.data[pubsubTopic][contentTopic] = []*pb.WakuMessage{} + return msgs, nil +} diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go index fa42406b..52a5af3a 100644 --- a/cmd/waku/server/rest/filter_test.go +++ b/cmd/waku/server/rest/filter_test.go @@ -7,8 +7,10 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "strings" "testing" + "time" "github.com/go-chi/chi/v5" "github.com/libp2p/go-libp2p/core/peerstore" @@ -18,6 +20,7 @@ import ( wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -32,7 +35,7 @@ func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode { } // node2 connects to node1 -func twoFilterConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) { +func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) { node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter @@ -40,10 +43,8 @@ func twoFilterConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1) require.NoError(t, err) - if pubSubTopic != "" { - err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic}) - require.NoError(t, err) - } + err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), pubSubTopics) + require.NoError(t, err) return node1, node2 } @@ -51,14 +52,14 @@ func twoFilterConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, // test 400, 404 status code for ping rest endpoint // both requests are not successful func TestFilterPingFailure(t *testing.T) { - node1, node2 := twoFilterConnectedNodes(t, "") + node1, node2 := twoFilterConnectedNodes(t) defer func() { node1.Stop() node2.Stop() }() router := chi.NewRouter() - _ = NewFilterService(node2, router, utils.Logger()) + _ = NewFilterService(node2, router, 0, utils.Logger()) // with malformed requestId rr := httptest.NewRecorder() @@ -97,7 +98,7 @@ func TestFilterSubscribeAndPing(t *testing.T) { }() router := chi.NewRouter() - _ = NewFilterService(node2, router, utils.Logger()) + _ = NewFilterService(node2, router, 0, utils.Logger()) // create subscription to peer rr := httptest.NewRecorder() @@ -140,7 +141,7 @@ func TestFilterSubscribeAndUnsubscribe(t *testing.T) { }() router := chi.NewRouter() - _ = NewFilterService(node2, router, utils.Logger()) + _ = NewFilterService(node2, router, 0, utils.Logger()) // create subscription to peer rr := httptest.NewRecorder() @@ -191,7 +192,7 @@ func TestFilterAllUnsubscribe(t *testing.T) { }() router := chi.NewRouter() - _ = NewFilterService(node2, router, utils.Logger()) + _ = NewFilterService(node2, router, 0, utils.Logger()) // create 2 different subscription to peer for _, ct := range []string{contentTopics1, contentTopics2} { @@ -246,8 +247,150 @@ func getFilterResponse(t *testing.T, body *bytes.Buffer) filterSubscriptionRespo require.NoError(t, err) return resp } +func getMessageResponse(t *testing.T, body *bytes.Buffer) []*pb.WakuMessage { + resp := []*pb.WakuMessage{} + err := json.Unmarshal(body.Bytes(), &resp) + require.NoError(t, err) + return resp +} func toString(t *testing.T, data interface{}) string { bytes, err := json.Marshal(data) require.NoError(t, err) return string(bytes) } + +func TestFilterGetMessages(t *testing.T) { + pubsubTopic := "/waku/2/test/proto" + contentTopic := "/waku/2/app/1" + + // get nodes add connect them + generatedPubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic) + require.NoError(t, err) + node1, node2 := twoFilterConnectedNodes(t, pubsubTopic, generatedPubsubTopic) + defer func() { + node1.Stop() + node2.Stop() + }() + + // set router and start filter service + router := chi.NewRouter() + service := NewFilterService(node2, router, 2, utils.Logger()) + go service.Start(context.Background()) + defer service.Stop() + + { // create subscription so that messages are cached + for _, pubsubTopic := range []string{"", pubsubTopic} { + requestId := protocol.GenerateRequestID() + rr := httptest.NewRecorder() + reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ + RequestId: requestId, + PubsubTopic: pubsubTopic, + ContentFilters: []string{contentTopic}, + })) + req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader) + router.ServeHTTP(rr, req) + checkJSON(t, filterSubscriptionResponse{ + RequestId: requestId, + StatusDesc: "OK", + }, getFilterResponse(t, rr.Body)) + require.Equal(t, http.StatusOK, rr.Code) + } + } + + // submit messages + messageByContentTopic := []*protocol.Envelope{ + genMessage("", contentTopic), + genMessage("", contentTopic), + genMessage("", contentTopic), + } + messageByPubsubTopic := []*protocol.Envelope{ + genMessage(pubsubTopic, contentTopic), + } + for _, envelope := range append(messageByContentTopic, messageByPubsubTopic...) { + node2.Broadcaster().Submit(envelope) + } + time.Sleep(1 * time.Second) + + { // with malformed contentTopic + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape("/waku/2/wrongtopic")), + nil, + ) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Equal(t, "bad content topic", rr.Body.String()) + } + + { // with check if the cache is working properly + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("/filter/v2/messages/%s", url.QueryEscape(contentTopic)), + nil, + ) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + checkJSON(t, toMessage(messageByContentTopic[1:]), getMessageResponse(t, rr.Body)) + } + + { // check if pubsubTopic is present in the url + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("/filter/v2/messages//%s", url.QueryEscape(contentTopic)), + nil, + ) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Equal(t, "missing pubsubTopic", rr.Body.String()) + } + + { // check messages by pubsub/contentTopic pair + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)), + nil, + ) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + checkJSON(t, toMessage(messageByPubsubTopic), getMessageResponse(t, rr.Body)) + } + + { // check if pubsubTopic/contentTOpic is subscribed or not. + rr := httptest.NewRecorder() + notSubscibredPubsubTopic := "/waku/2/test2/proto" + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("/filter/v2/messages/%s/%s", url.QueryEscape(notSubscibredPubsubTopic), url.QueryEscape(contentTopic)), + nil, + ) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusNotFound, rr.Code) + require.Equal(t, + fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic), + rr.Body.String(), + ) + } +} + +func toMessage(envs []*protocol.Envelope) []*pb.WakuMessage { + msgs := make([]*pb.WakuMessage, len(envs)) + for i, env := range envs { + msgs[i] = env.Message() + } + return msgs +} + +func genMessage(pubsubTopic, contentTopic string) *protocol.Envelope { + if pubsubTopic == "" { + pubsubTopic, _ = protocol.GetPubSubTopicFromContentTopic(contentTopic) + } + return protocol.NewEnvelope( + &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: contentTopic, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + }, + 0, + pubsubTopic, + ) +} diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index c7e8275b..98c5437c 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -18,10 +18,11 @@ type WakuRest struct { log *zap.Logger - relayService *RelayService + relayService *RelayService + filterService *FilterService } -func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, enableAdmin bool, relayCacheCapacity int, log *zap.Logger) *WakuRest { +func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, enableAdmin bool, relayCacheCapacity, filterCacheCapacity int, log *zap.Logger) *WakuRest { wrpc := new(WakuRest) wrpc.log = log.Named("rest") @@ -61,7 +62,11 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool } if node.FilterLightnode() != nil { - _ = NewFilterService(node, mux, log) + filterService := NewFilterService(node, mux, filterCacheCapacity, log) + server.RegisterOnShutdown(func() { + filterService.Stop() + }) + wrpc.filterService = filterService } return wrpc @@ -73,6 +78,9 @@ func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) { if r.node.Relay() != nil { go r.relayService.Start(ctx) } + if r.node.FilterLightnode() != nil { + go r.filterService.Start(ctx) + } go func() { _ = r.server.ListenAndServe() diff --git a/cmd/waku/server/rest/waku_rest_test.go b/cmd/waku/server/rest/waku_rest_test.go index b6d3bd7a..d27810b4 100644 --- a/cmd/waku/server/rest/waku_rest_test.go +++ b/cmd/waku/server/rest/waku_rest_test.go @@ -13,7 +13,7 @@ func TestWakuRest(t *testing.T) { n, err := node.New(options) require.NoError(t, err) - rpc := NewWakuRest(n, "127.0.0.1", 8080, false, false, 10, utils.Logger()) + rpc := NewWakuRest(n, "127.0.0.1", 8080, false, false, 10, 0, utils.Logger()) require.NotNil(t, rpc.server) require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index a014d095..1dfb6e72 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -482,6 +482,11 @@ func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetai return subs } +func (wf *WakuFilterLightNode) IsListening(pubsubTopic, contentTopic string) bool { + return wf.subscriptions.IsListening(pubsubTopic, contentTopic) + +} + // UnsubscribeWithSubscription is used to close a particular subscription // If there are no more subscriptions matching the passed [peer, contentFilter] pair, // server unsubscribe is also performed