feat: Add rpc filter

This commit is contained in:
Anthony Laibe 2021-11-17 17:04:54 +01:00
parent 2ae370ca41
commit 38e5fdbe3e
4 changed files with 148 additions and 1 deletions

61
waku/v2/rpc/filter.go Normal file
View File

@ -0,0 +1,61 @@
package rpc
import (
"net/http"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
type FilterService struct {
node *node.WakuNode
}
type FilterContentFilterArgs struct {
Topic string `json:"topic,omitempty"`
ContentFilters []pb.ContentFilter `json:"contentFilters,omitempty"`
}
func makeContentFilter(args *FilterContentFilterArgs) filter.ContentFilter {
var contentTopics []string
for _, contentFilter := range args.ContentFilters {
contentTopics = append(contentTopics, contentFilter.ContentTopic)
}
return filter.ContentFilter{
Topic: args.Topic,
ContentTopics: contentTopics,
}
}
func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error {
_, _, err := f.node.Filter().Subscribe(
req.Context(),
makeContentFilter(args),
filter.WithAutomaticPeerSelection(),
)
if err != nil {
log.Error("Error subscribing to topic:", args.Topic, "err:", err)
reply.Success = false
reply.Error = err.Error()
return nil
}
reply.Success = true
return nil
}
func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error {
err := f.node.Filter().UnsubscribeFilter(
req.Context(),
makeContentFilter(args),
)
if err != nil {
log.Error("Error unsubscribing to topic:", args.Topic, "err:", err)
reply.Success = false
reply.Error = err.Error()
return nil
}
reply.Success = true
return nil
}

View File

@ -0,0 +1,81 @@
package rpc
import (
"context"
"crypto/rand"
"fmt"
"testing"
"github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/tests"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/stretchr/testify/require"
)
var testTopic = "test"
func makeFilterService(t *testing.T) *FilterService {
n, err := node.New(context.Background(), node.WithWakuFilter(true), node.WithWakuRelay())
require.NoError(t, err)
err = n.Start()
require.NoError(t, err)
_, err = n.Relay().Subscribe(context.Background(), (*relay.Topic)(&testTopic))
require.NoError(t, err)
return &FilterService{n}
}
func TestFilterSubscription(t *testing.T) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10))
require.NoError(t, err)
_, err = node.Subscribe(context.Background(), (*relay.Topic)(&testTopic))
require.NoError(t, err)
_ = filter.NewWakuFilter(context.Background(), host, false)
d := makeFilterService(t)
defer d.node.Stop()
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty()))
require.NoError(t, err)
var addr multiaddr.Multiaddr
for _, a := range host.Addrs() {
addr = a.Encapsulate(hostInfo)
break
}
_, err = d.node.AddPeer(addr, filter.FilterID_v20beta1)
require.NoError(t, err)
args := &FilterContentFilterArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}}
var reply SuccessReply
err = d.PostV1Subscription(
makeRequest(t),
args,
&reply,
)
require.NoError(t, err)
require.True(t, reply.Success)
err = d.DeleteV1Subscription(
makeRequest(t),
args,
&reply,
)
require.NoError(t, err)
require.True(t, reply.Success)
}

View File

@ -31,7 +31,7 @@ func TestPostV1Message(t *testing.T) {
require.True(t, reply.Success)
}
func TestSubscription(t *testing.T) {
func TestRelaySubscription(t *testing.T) {
var reply SuccessReply
d := makeRelayService(t)

View File

@ -43,6 +43,11 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc {
log.Error(err)
}
err = s.RegisterService(&FilterService{node}, "Filter")
if err != nil {
log.Error(err)
}
mux := http.NewServeMux()
mux.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
t := time.Now()