mirror of https://github.com/status-im/go-waku.git
feat: add filter v2 rpc (#798)
* feat: add filter v2 rpc ping, subscribe/unsubscribe and unsubscribeAll. * test(filterRest): pingFailure, subscribe-ping, unsubscribe and unsubscribeAll
This commit is contained in:
parent
c58d0f51e4
commit
0868f5d4dd
|
@ -0,0 +1,284 @@
|
|||
package rest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type filterRequestId []byte
|
||||
|
||||
func (r *filterRequestId) UnmarshalJSON(bodyBytes []byte) error {
|
||||
body := strings.Trim(string(bodyBytes), `"`)
|
||||
reqId, err := hex.DecodeString(body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*r = reqId
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r filterRequestId) String() string {
|
||||
return hex.EncodeToString(r)
|
||||
}
|
||||
func (r filterRequestId) MarshalJSON() ([]byte, error) {
|
||||
return []byte(fmt.Sprintf(`"%s"`, r.String())), nil
|
||||
}
|
||||
|
||||
const filterv2Ping = "/filter/v2/subscriptions/{requestId}"
|
||||
const filterv2Subscribe = "/filter/v2/subscriptions"
|
||||
const filterv2SubscribeAll = "/filter/v2/subscriptions/all"
|
||||
|
||||
// FilterService represents the REST service for Filter client
|
||||
type FilterService struct {
|
||||
node *node.WakuNode
|
||||
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
// NewFilterService returns an instance of FilterService
|
||||
func NewFilterService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *FilterService {
|
||||
s := &FilterService{
|
||||
node: node,
|
||||
log: log.Named("filter"),
|
||||
}
|
||||
|
||||
m.Get(filterv2Ping, s.ping)
|
||||
m.Post(filterv2Subscribe, s.subscribe)
|
||||
m.Delete(filterv2Subscribe, s.unsubscribe)
|
||||
m.Delete(filterv2SubscribeAll, s.unsubscribeAll)
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// 400 for bad requestId
|
||||
// 404 when request failed or no suitable peers
|
||||
// 200 when ping successful
|
||||
func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) {
|
||||
var requestId filterRequestId
|
||||
if err := requestId.UnmarshalJSON([]byte(chi.URLParam(req, "requestId"))); err != nil {
|
||||
s.log.Error("bad request id", zap.Error(err))
|
||||
writeResponse(w, &filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "bad request id",
|
||||
}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// selecting random peer that supports filter protocol
|
||||
peerId := s.getRandomFilterPeer(req.Context(), requestId, w)
|
||||
if peerId == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.node.FilterLightnode().Ping(req.Context(), peerId, filter.WithPingRequestId(requestId)); err != nil {
|
||||
s.log.Error("ping request failed", zap.Error(err))
|
||||
writeResponse(w, &filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "ping request failed",
|
||||
}, http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
// success
|
||||
writeResponse(w, &filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: http.StatusText(http.StatusOK),
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
///////////////////////
|
||||
///////////////////////
|
||||
|
||||
// same for FilterUnsubscribeRequest
|
||||
type filterSubscriptionRequest struct {
|
||||
RequestId filterRequestId `json:"requestId"`
|
||||
ContentFilters []string `json:"contentFilters"`
|
||||
PubsubTopic string `json:"pubsubTopic"`
|
||||
}
|
||||
|
||||
type filterSubscriptionResponse struct {
|
||||
RequestId filterRequestId `json:"requestId"`
|
||||
StatusDesc string `json:"statusDesc"`
|
||||
}
|
||||
|
||||
// 400 on invalid request
|
||||
// 404 on failed subscription
|
||||
// 200 on single returned successful subscription
|
||||
// NOTE: subscribe on filter client randomly selects a peer if missing for given pubSubTopic
|
||||
func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
|
||||
message := filterSubscriptionRequest{}
|
||||
if !s.readBody(w, req, &message) {
|
||||
return
|
||||
}
|
||||
|
||||
//
|
||||
subscriptions, err := s.node.FilterLightnode().Subscribe(req.Context(),
|
||||
protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...),
|
||||
filter.WithRequestID(message.RequestId))
|
||||
|
||||
// on partial subscribe failure
|
||||
if len(subscriptions) > 0 && err != nil {
|
||||
s.log.Error("partial subscribe failed", zap.Error(err))
|
||||
// on partial failure
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestId: message.RequestId,
|
||||
StatusDesc: err.Error(),
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
s.log.Error("subscription failed", zap.Error(err))
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestId: message.RequestId,
|
||||
StatusDesc: "subscription failed",
|
||||
}, http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
// on success
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestId: message.RequestId,
|
||||
StatusDesc: http.StatusText(http.StatusOK),
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
// 400 on invalid request
|
||||
// 500 on failed subscription
|
||||
// 200 on successful unsubscribe
|
||||
// NOTE: unsubscribe on filter client will remove subscription from all peers with matching pubSubTopic, if peerId is not provided
|
||||
// to match functionality in nwaku, we will randomly select a peer that supports filter protocol.
|
||||
func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
|
||||
message := filterSubscriptionRequest{} // as pubSubTopics can also be present
|
||||
if !s.readBody(w, req, &message) {
|
||||
return
|
||||
}
|
||||
|
||||
peerId := s.getRandomFilterPeer(req.Context(), message.RequestId, w)
|
||||
if peerId == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// unsubscribe on filter
|
||||
errCh, err := s.node.FilterLightnode().Unsubscribe(
|
||||
req.Context(),
|
||||
protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...),
|
||||
filter.WithRequestID(message.RequestId),
|
||||
filter.WithPeer(peerId),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
s.log.Error("unsubscribe failed", zap.Error(err))
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestId: message.RequestId,
|
||||
StatusDesc: err.Error(),
|
||||
}, http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
// on success
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestId: message.RequestId,
|
||||
StatusDesc: s.unsubscribeGetMessage(errCh),
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func (s *FilterService) unsubscribeGetMessage(ch <-chan filter.WakuFilterPushResult) string {
|
||||
var peerIds string
|
||||
ind := 0
|
||||
for entry := range ch {
|
||||
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
|
||||
if ind != 0 {
|
||||
peerIds += ", "
|
||||
}
|
||||
peerIds += entry.PeerID.String()
|
||||
ind++
|
||||
}
|
||||
if peerIds != "" {
|
||||
return "can't unsubscribe from " + peerIds
|
||||
}
|
||||
return http.StatusText(http.StatusOK)
|
||||
}
|
||||
|
||||
// ///////////////////////
|
||||
// ///////////////////////
|
||||
type filterUnsubscribeAllRequest struct {
|
||||
RequestId filterRequestId `json:"requestId"`
|
||||
}
|
||||
|
||||
func (s *FilterService) readBody(w http.ResponseWriter, req *http.Request, message interface{}) bool {
|
||||
decoder := json.NewDecoder(req.Body)
|
||||
if err := decoder.Decode(message); err != nil {
|
||||
s.log.Error("bad request", zap.Error(err))
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return false
|
||||
}
|
||||
defer req.Body.Close()
|
||||
return true
|
||||
}
|
||||
|
||||
// 400 on invalid request
|
||||
// 500 on failed subscription
|
||||
// 200 on all successful unsubscribe
|
||||
// unsubscribe all subscriptions for a given peer
|
||||
func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request) {
|
||||
message := filterUnsubscribeAllRequest{}
|
||||
if !s.readBody(w, req, &message) {
|
||||
return
|
||||
}
|
||||
|
||||
peerId := s.getRandomFilterPeer(req.Context(), message.RequestId, w)
|
||||
if peerId == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// unsubscribe all subscriptions for a given peer
|
||||
errCh, err := s.node.FilterLightnode().UnsubscribeAll(
|
||||
req.Context(),
|
||||
filter.WithRequestID(message.RequestId),
|
||||
filter.WithPeer(peerId),
|
||||
)
|
||||
if err != nil {
|
||||
s.log.Error("unsubscribeAll failed", zap.Error(err))
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestId: message.RequestId,
|
||||
StatusDesc: err.Error(),
|
||||
}, http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
// on success
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestId: message.RequestId,
|
||||
StatusDesc: s.unsubscribeGetMessage(errCh),
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId []byte, w http.ResponseWriter) peer.ID {
|
||||
// selecting random peer that supports filter protocol
|
||||
peerId, err := s.node.PeerManager().SelectPeer(peermanager.PeerSelectionCriteria{
|
||||
SelectionType: peermanager.Automatic,
|
||||
Proto: filter.FilterSubscribeID_v20beta1,
|
||||
Ctx: ctx,
|
||||
})
|
||||
if err != nil {
|
||||
s.log.Error("selecting peer", zap.Error(err))
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "No suitable peers",
|
||||
}, http.StatusServiceUnavailable)
|
||||
return ""
|
||||
}
|
||||
return peerId
|
||||
}
|
|
@ -0,0 +1,231 @@
|
|||
openapi: 3.0.3
|
||||
info:
|
||||
title: Waku V2 node REST API
|
||||
version: 1.0.0
|
||||
contact:
|
||||
name: VAC Team
|
||||
url: https://forum.vac.dev/
|
||||
tags:
|
||||
- name: filter
|
||||
description: Filter REST API for WakuV2 node
|
||||
|
||||
paths:
|
||||
/filter/v2/subscriptions/{requestId}:
|
||||
get: # get_waku_v2_filter_v2_subscription - ping
|
||||
summary: Subscriber-ping - a peer can query if there is a registered subscription for it
|
||||
description: |
|
||||
Subscriber peer can query its subscription existence on service node.
|
||||
Returns HTTP200 if exists and HTTP404 if not.
|
||||
Client must not fill anything but requestId in the request body.
|
||||
operationId: subscriberPing
|
||||
tags:
|
||||
- filter
|
||||
parameters:
|
||||
- in: path
|
||||
name: requestId
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
description: Id of ping request
|
||||
responses:
|
||||
'200':
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'400':
|
||||
description: Bad request.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'404':
|
||||
description: Not found.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'5XX':
|
||||
description: Unexpected error.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
|
||||
|
||||
/filter/v2/subscriptions:
|
||||
post: # post_waku_v2_filter_v2_subscription
|
||||
summary: Subscribe a peer to an array of content topics under a pubsubTopic
|
||||
description: |
|
||||
Subscribe a peer to an array of content topics under a pubsubTopic.
|
||||
|
||||
It is allowed to refresh or add new content topic to an existing subscription.
|
||||
|
||||
Fields pubsubTopic and contentFilters must be filled.
|
||||
operationId: postSubscriptions
|
||||
tags:
|
||||
- filter
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscribeRequest'
|
||||
responses:
|
||||
'200':
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
# TODO: Review the possible errors of this endpoint
|
||||
'400':
|
||||
description: Bad request.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'404':
|
||||
description: Not found.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'5XX':
|
||||
description: Unexpected error.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
delete: # delete_waku_v2_filter_v2_subscription
|
||||
summary: Unsubscribe a peer from content topics
|
||||
description: |
|
||||
Unsubscribe a peer from content topics
|
||||
Only that subscription will be removed which matches existing.
|
||||
operationId: deleteSubscriptions
|
||||
tags:
|
||||
- filter
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterUnsubscribeRequest'
|
||||
responses:
|
||||
'200':
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'400':
|
||||
description: Bad request.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'404':
|
||||
description: Not found.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'5XX':
|
||||
description: Unexpected error.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
|
||||
/filter/v2/subscriptions/all:
|
||||
delete: # delete_waku_v2_filter_v2_subscription
|
||||
summary: Unsubscribe a peer from all content topics
|
||||
description: |
|
||||
Unsubscribe a peer from all content topics
|
||||
operationId: deleteAllSubscriptions
|
||||
tags:
|
||||
- filter
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterUnsubscribeAllRequest'
|
||||
responses:
|
||||
'200':
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'400':
|
||||
description: Bad request.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'404':
|
||||
description: Not found.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
'5XX':
|
||||
description: Unexpected error.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/FilterSubscriptionResponse'
|
||||
|
||||
components:
|
||||
PubSubTopic:
|
||||
type: string
|
||||
ContentTopic:
|
||||
type: string
|
||||
|
||||
FilterSubscriptionResponse:
|
||||
type: object
|
||||
properties:
|
||||
requestId:
|
||||
type: string
|
||||
statusDesc:
|
||||
type: string
|
||||
required:
|
||||
- requestId
|
||||
|
||||
FilterSubscribeRequest:
|
||||
type: object
|
||||
properties:
|
||||
requestId:
|
||||
type: string
|
||||
contentFilters:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/ContentTopic'
|
||||
pubsubTopic:
|
||||
$ref: "#/components/schemas/PubSubTopic"
|
||||
required:
|
||||
- requestId
|
||||
- contentFilters
|
||||
- pubsubTopic
|
||||
|
||||
FilterUnsubscribeRequest:
|
||||
type: object
|
||||
properties:
|
||||
requestId:
|
||||
type: string
|
||||
contentFilters:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/ContentTopic'
|
||||
pubsubTopic:
|
||||
$ref: "#/components/schemas/PubSubTopic"
|
||||
required:
|
||||
- requestId
|
||||
- contentFilters
|
||||
|
||||
FilterUnsubscribeAllRequest:
|
||||
type: object
|
||||
properties:
|
||||
requestId:
|
||||
type: string
|
||||
required:
|
||||
- requestId
|
|
@ -0,0 +1,253 @@
|
|||
package rest
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
|
||||
node, err := node.New(opts...)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = node.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
return node
|
||||
}
|
||||
|
||||
// node2 connects to node1
|
||||
func twoFilterConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) {
|
||||
node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter
|
||||
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter
|
||||
|
||||
node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
|
||||
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1)
|
||||
require.NoError(t, err)
|
||||
|
||||
if pubSubTopic != "" {
|
||||
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
return node1, node2
|
||||
}
|
||||
|
||||
// test 400, 404 status code for ping rest endpoint
|
||||
// both requests are not successful
|
||||
func TestFilterPingFailure(t *testing.T) {
|
||||
node1, node2 := twoFilterConnectedNodes(t, "")
|
||||
defer func() {
|
||||
node1.Stop()
|
||||
node2.Stop()
|
||||
}()
|
||||
|
||||
router := chi.NewRouter()
|
||||
_ = NewFilterService(node2, router, utils.Logger())
|
||||
|
||||
// with malformed requestId
|
||||
rr := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", "invalid_request_id"), nil)
|
||||
router.ServeHTTP(rr, req)
|
||||
checkJSON(t, filterSubscriptionResponse{
|
||||
RequestId: []byte{},
|
||||
StatusDesc: "bad request id",
|
||||
}, getFilterResponse(t, rr.Body))
|
||||
require.Equal(t, http.StatusBadRequest, rr.Code)
|
||||
|
||||
// no subscription with peer
|
||||
var requestId filterRequestId = protocol.GenerateRequestID()
|
||||
rr = httptest.NewRecorder()
|
||||
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil)
|
||||
router.ServeHTTP(rr, req)
|
||||
checkJSON(t, filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "ping request failed",
|
||||
}, getFilterResponse(t, rr.Body))
|
||||
require.Equal(t, http.StatusServiceUnavailable, rr.Code)
|
||||
}
|
||||
|
||||
// create a filter subscription to the peer and try peer that peer
|
||||
// both steps should be successful
|
||||
func TestFilterSubscribeAndPing(t *testing.T) {
|
||||
pubsubTopic := "/waku/2/test/proto"
|
||||
contentTopics := []string{"test"}
|
||||
var requestId filterRequestId = protocol.GenerateRequestID()
|
||||
|
||||
//
|
||||
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
|
||||
defer func() {
|
||||
node1.Stop()
|
||||
node2.Stop()
|
||||
}()
|
||||
|
||||
router := chi.NewRouter()
|
||||
_ = NewFilterService(node2, router, utils.Logger())
|
||||
|
||||
// create subscription to peer
|
||||
rr := httptest.NewRecorder()
|
||||
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
|
||||
RequestId: requestId,
|
||||
PubsubTopic: pubsubTopic,
|
||||
ContentFilters: contentTopics,
|
||||
}))
|
||||
req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader)
|
||||
router.ServeHTTP(rr, req)
|
||||
checkJSON(t, filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "OK",
|
||||
}, getFilterResponse(t, rr.Body))
|
||||
require.Equal(t, http.StatusOK, rr.Code)
|
||||
|
||||
// trying pinging the peer once there is subscription to it
|
||||
rr = httptest.NewRecorder()
|
||||
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil)
|
||||
router.ServeHTTP(rr, req)
|
||||
checkJSON(t, filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "OK",
|
||||
}, getFilterResponse(t, rr.Body))
|
||||
require.Equal(t, http.StatusOK, rr.Code)
|
||||
}
|
||||
|
||||
// create subscription to peer
|
||||
// delete the subscription to the peer with matching pubSub and contentTopic
|
||||
func TestFilterSubscribeAndUnsubscribe(t *testing.T) {
|
||||
pubsubTopic := "/waku/2/test/proto"
|
||||
contentTopics := []string{"test"}
|
||||
var requestId filterRequestId = protocol.GenerateRequestID()
|
||||
|
||||
//
|
||||
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
|
||||
defer func() {
|
||||
node1.Stop()
|
||||
node2.Stop()
|
||||
}()
|
||||
|
||||
router := chi.NewRouter()
|
||||
_ = NewFilterService(node2, router, utils.Logger())
|
||||
|
||||
// create subscription to peer
|
||||
rr := httptest.NewRecorder()
|
||||
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
|
||||
RequestId: requestId,
|
||||
PubsubTopic: pubsubTopic,
|
||||
ContentFilters: contentTopics,
|
||||
}))
|
||||
req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader)
|
||||
router.ServeHTTP(rr, req)
|
||||
checkJSON(t, filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "OK",
|
||||
}, getFilterResponse(t, rr.Body))
|
||||
require.Equal(t, http.StatusOK, rr.Code)
|
||||
|
||||
// delete the subscription to the peer with matching pubSub and contentTopic
|
||||
requestId = protocol.GenerateRequestID()
|
||||
rr = httptest.NewRecorder()
|
||||
reqReader = strings.NewReader(toString(t, filterSubscriptionRequest{
|
||||
RequestId: requestId,
|
||||
PubsubTopic: pubsubTopic,
|
||||
ContentFilters: contentTopics,
|
||||
}))
|
||||
req, _ = http.NewRequest(http.MethodDelete, filterv2Subscribe, reqReader)
|
||||
router.ServeHTTP(rr, req)
|
||||
checkJSON(t, filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "OK",
|
||||
}, getFilterResponse(t, rr.Body))
|
||||
require.Equal(t, http.StatusOK, rr.Code)
|
||||
}
|
||||
|
||||
// create 2 subscription from filter client to server
|
||||
// make a unsubscribeAll request
|
||||
// try pinging the peer, if 404 is returned then unsubscribeAll was successful
|
||||
func TestFilterAllUnsubscribe(t *testing.T) {
|
||||
pubsubTopic := "/waku/2/test/proto"
|
||||
contentTopics1 := "ct_1"
|
||||
contentTopics2 := "ct_2"
|
||||
var requestId filterRequestId
|
||||
|
||||
//
|
||||
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
|
||||
defer func() {
|
||||
node1.Stop()
|
||||
node2.Stop()
|
||||
}()
|
||||
|
||||
router := chi.NewRouter()
|
||||
_ = NewFilterService(node2, router, utils.Logger())
|
||||
|
||||
// create 2 different subscription to peer
|
||||
for _, ct := range []string{contentTopics1, contentTopics2} {
|
||||
requestId = protocol.GenerateRequestID()
|
||||
rr := httptest.NewRecorder()
|
||||
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
|
||||
RequestId: requestId,
|
||||
PubsubTopic: pubsubTopic,
|
||||
ContentFilters: []string{ct},
|
||||
}))
|
||||
req, _ := http.NewRequest(http.MethodPost, filterv2Subscribe, reqReader)
|
||||
router.ServeHTTP(rr, req)
|
||||
checkJSON(t, filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "OK",
|
||||
}, getFilterResponse(t, rr.Body))
|
||||
require.Equal(t, http.StatusOK, rr.Code)
|
||||
}
|
||||
|
||||
// delete all subscription to the peer
|
||||
requestId = protocol.GenerateRequestID()
|
||||
rr := httptest.NewRecorder()
|
||||
reqReader := strings.NewReader(toString(t, filterUnsubscribeAllRequest{
|
||||
RequestId: requestId,
|
||||
}))
|
||||
req, _ := http.NewRequest(http.MethodDelete, filterv2SubscribeAll, reqReader)
|
||||
router.ServeHTTP(rr, req)
|
||||
checkJSON(t, filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "OK",
|
||||
}, getFilterResponse(t, rr.Body))
|
||||
require.Equal(t, http.StatusOK, rr.Code)
|
||||
|
||||
// check if all subscriptions are deleted to the peer are deleted
|
||||
requestId = protocol.GenerateRequestID()
|
||||
rr = httptest.NewRecorder()
|
||||
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestId), nil)
|
||||
router.ServeHTTP(rr, req)
|
||||
checkJSON(t, filterSubscriptionResponse{
|
||||
RequestId: requestId,
|
||||
StatusDesc: "ping request failed",
|
||||
}, getFilterResponse(t, rr.Body))
|
||||
require.Equal(t, http.StatusServiceUnavailable, rr.Code)
|
||||
}
|
||||
|
||||
func checkJSON(t *testing.T, expected, actual interface{}) {
|
||||
require.JSONEq(t, toString(t, expected), toString(t, actual))
|
||||
}
|
||||
func getFilterResponse(t *testing.T, body *bytes.Buffer) filterSubscriptionResponse {
|
||||
resp := filterSubscriptionResponse{}
|
||||
err := json.Unmarshal(body.Bytes(), &resp)
|
||||
require.NoError(t, err)
|
||||
return resp
|
||||
}
|
||||
func toString(t *testing.T, data interface{}) string {
|
||||
bytes, err := json.Marshal(data)
|
||||
require.NoError(t, err)
|
||||
return string(bytes)
|
||||
}
|
|
@ -2,7 +2,6 @@ package rest
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
|
@ -19,20 +18,10 @@ import (
|
|||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
func createLightPushNode(t *testing.T) *node.WakuNode {
|
||||
node, err := node.New(node.WithLightPush(), node.WithWakuRelay())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = node.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
return node
|
||||
}
|
||||
|
||||
// node2 connects to node1
|
||||
func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) {
|
||||
node1 := createLightPushNode(t)
|
||||
node2 := createLightPushNode(t)
|
||||
node1 := createNode(t, node.WithLightPush(), node.WithWakuRelay())
|
||||
node2 := createNode(t, node.WithLightPush(), node.WithWakuRelay())
|
||||
|
||||
node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
|
||||
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1)
|
||||
|
|
|
@ -34,11 +34,9 @@ func writeResponse(w http.ResponseWriter, value interface{}, code int) {
|
|||
return
|
||||
}
|
||||
|
||||
_, err = w.Write(jsonResponse)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// w.Write implicitly writes a 200 status code
|
||||
// and only once we can write 2xx-5xx status code
|
||||
// so any statusCode apart from 1xx being written to the header, will be ignored.
|
||||
w.WriteHeader(code)
|
||||
_, _ = w.Write(jsonResponse)
|
||||
}
|
||||
|
|
|
@ -60,6 +60,10 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
|
|||
_ = NewAdminService(node, mux, wrpc.log)
|
||||
}
|
||||
|
||||
if node.FilterLightnode() != nil {
|
||||
_ = NewFilterService(node, mux, log)
|
||||
}
|
||||
|
||||
return wrpc
|
||||
}
|
||||
|
||||
|
|
|
@ -636,6 +636,11 @@ func (w *WakuNode) FilterLightnode() *filter.WakuFilterLightNode {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PeerManager for getting peer filterv2 protocol
|
||||
func (w *WakuNode) PeerManager() *peermanager.PeerManager {
|
||||
return w.peermanager
|
||||
}
|
||||
|
||||
// Lightpush is used to access any operation related to Waku Lightpush protocol
|
||||
func (w *WakuNode) Lightpush() *lightpush.WakuLightPush {
|
||||
if result, ok := w.lightPush.(*lightpush.WakuLightPush); ok {
|
||||
|
|
|
@ -298,7 +298,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
|
|||
} else {
|
||||
selectedPeer = params.selectedPeer
|
||||
}
|
||||
|
||||
if selectedPeer == "" {
|
||||
wf.metrics.RecordError(peerNotFoundFailure)
|
||||
wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
|
||||
|
@ -364,16 +363,24 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeO
|
|||
return params, nil
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error {
|
||||
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...FilterPingOption) error {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
params := &FilterPingParameters{}
|
||||
for _, opt := range opts {
|
||||
opt(params)
|
||||
}
|
||||
if len(params.requestID) == 0 {
|
||||
params.requestID = protocol.GenerateRequestID()
|
||||
}
|
||||
|
||||
return wf.request(
|
||||
ctx,
|
||||
&FilterSubscribeParameters{selectedPeer: peerID, requestID: protocol.GenerateRequestID()},
|
||||
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
|
||||
pb.FilterSubscribeRequest_SUBSCRIBER_PING,
|
||||
protocol.ContentFilter{})
|
||||
}
|
||||
|
|
|
@ -18,6 +18,19 @@ func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
|
|||
}
|
||||
}
|
||||
|
||||
type (
|
||||
FilterPingParameters struct {
|
||||
requestID []byte
|
||||
}
|
||||
FilterPingOption func(*FilterPingParameters)
|
||||
)
|
||||
|
||||
func WithPingRequestId(requestId []byte) FilterPingOption {
|
||||
return func(params *FilterPingParameters) {
|
||||
params.requestID = requestId
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
FilterSubscribeParameters struct {
|
||||
selectedPeer peer.ID
|
||||
|
|
Loading…
Reference in New Issue