diff --git a/waku/v2/rpc/filter.go b/waku/v2/rpc/filter.go new file mode 100644 index 00000000..b8a3e870 --- /dev/null +++ b/waku/v2/rpc/filter.go @@ -0,0 +1,61 @@ +package rpc + +import ( + "net/http" + + "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/filter" + "github.com/status-im/go-waku/waku/v2/protocol/pb" +) + +type FilterService struct { + node *node.WakuNode +} + +type FilterContentFilterArgs struct { + Topic string `json:"topic,omitempty"` + ContentFilters []pb.ContentFilter `json:"contentFilters,omitempty"` +} + +func makeContentFilter(args *FilterContentFilterArgs) filter.ContentFilter { + var contentTopics []string + for _, contentFilter := range args.ContentFilters { + contentTopics = append(contentTopics, contentFilter.ContentTopic) + } + + return filter.ContentFilter{ + Topic: args.Topic, + ContentTopics: contentTopics, + } +} + +func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { + _, _, err := f.node.Filter().Subscribe( + req.Context(), + makeContentFilter(args), + filter.WithAutomaticPeerSelection(), + ) + if err != nil { + log.Error("Error subscribing to topic:", args.Topic, "err:", err) + reply.Success = false + reply.Error = err.Error() + return nil + } + reply.Success = true + return nil +} + +func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { + err := f.node.Filter().UnsubscribeFilter( + req.Context(), + makeContentFilter(args), + ) + if err != nil { + log.Error("Error unsubscribing to topic:", args.Topic, "err:", err) + reply.Success = false + reply.Error = err.Error() + return nil + } + reply.Success = true + return nil +} diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go new file mode 100644 index 00000000..9812484f --- /dev/null +++ b/waku/v2/rpc/filter_test.go @@ -0,0 +1,81 @@ +package rpc + +import ( + "context" + "crypto/rand" + "fmt" + "testing" + + "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/tests" + v2 "github.com/status-im/go-waku/waku/v2" + "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/filter" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/relay" + "github.com/stretchr/testify/require" +) + +var testTopic = "test" + +func makeFilterService(t *testing.T) *FilterService { + n, err := node.New(context.Background(), node.WithWakuFilter(true), node.WithWakuRelay()) + require.NoError(t, err) + err = n.Start() + require.NoError(t, err) + + _, err = n.Relay().Subscribe(context.Background(), (*relay.Topic)(&testTopic)) + require.NoError(t, err) + + return &FilterService{n} +} + +func TestFilterSubscription(t *testing.T) { + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10)) + require.NoError(t, err) + + _, err = node.Subscribe(context.Background(), (*relay.Topic)(&testTopic)) + require.NoError(t, err) + + _ = filter.NewWakuFilter(context.Background(), host, false) + + d := makeFilterService(t) + defer d.node.Stop() + + hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty())) + require.NoError(t, err) + + var addr multiaddr.Multiaddr + for _, a := range host.Addrs() { + addr = a.Encapsulate(hostInfo) + break + } + + _, err = d.node.AddPeer(addr, filter.FilterID_v20beta1) + require.NoError(t, err) + + args := &FilterContentFilterArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}} + + var reply SuccessReply + err = d.PostV1Subscription( + makeRequest(t), + args, + &reply, + ) + require.NoError(t, err) + require.True(t, reply.Success) + + err = d.DeleteV1Subscription( + makeRequest(t), + args, + &reply, + ) + require.NoError(t, err) + require.True(t, reply.Success) +} diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index 83342adc..b59de4e4 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -31,7 +31,7 @@ func TestPostV1Message(t *testing.T) { require.True(t, reply.Success) } -func TestSubscription(t *testing.T) { +func TestRelaySubscription(t *testing.T) { var reply SuccessReply d := makeRelayService(t) diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index 1cd045ab..debfbc07 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -43,6 +43,11 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { log.Error(err) } + err = s.RegisterService(&FilterService{node}, "Filter") + if err != nil { + log.Error(err) + } + mux := http.NewServeMux() mux.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) { t := time.Now()