feat: Add rpc method for filter#getmessages

This commit is contained in:
Anthony Laibe 2021-11-19 15:17:38 +01:00
parent 79bb101787
commit 2afffd9dd9
8 changed files with 188 additions and 44 deletions

View File

@ -273,7 +273,7 @@ func Execute(options Options) {
var rpcServer *rpc.WakuRpc var rpcServer *rpc.WakuRpc
if options.RPCServer.Enable { if options.RPCServer.Enable {
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port) rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port)
go rpcServer.Start() rpcServer.Start()
} }
// Wait for a SIGINT or SIGTERM signal // Wait for a SIGINT or SIGTERM signal

View File

@ -22,21 +22,21 @@ func NewSubscribers() *Subscribers {
return &Subscribers{} return &Subscribers{}
} }
func (self *Subscribers) Append(s Subscriber) int { func (sub *Subscribers) Append(s Subscriber) int {
self.Lock() sub.Lock()
defer self.Unlock() defer sub.Unlock()
self.subscribers = append(self.subscribers, s) sub.subscribers = append(sub.subscribers, s)
return len(self.subscribers) return len(sub.subscribers)
} }
func (self *Subscribers) Items() <-chan Subscriber { func (sub *Subscribers) Items() <-chan Subscriber {
c := make(chan Subscriber) c := make(chan Subscriber)
f := func() { f := func() {
self.RLock() sub.RLock()
defer self.RUnlock() defer sub.RUnlock()
for _, value := range self.subscribers { for _, value := range sub.subscribers {
c <- value c <- value
} }
close(c) close(c)
@ -46,17 +46,17 @@ func (self *Subscribers) Items() <-chan Subscriber {
return c return c
} }
func (self *Subscribers) Length() int { func (sub *Subscribers) Length() int {
self.RLock() sub.RLock()
defer self.RUnlock() defer sub.RUnlock()
return len(self.subscribers) return len(sub.subscribers)
} }
func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) { func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) {
var peerIdsToRemove []peer.ID var peerIdsToRemove []peer.ID
for _, subscriber := range self.subscribers { for _, subscriber := range sub.subscribers {
if subscriber.peer != peerID { if subscriber.peer != peerID {
continue continue
} }
@ -82,11 +82,11 @@ func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*
// make sure we delete the subscriber // make sure we delete the subscriber
// if no more content filters left // if no more content filters left
for _, peerId := range peerIdsToRemove { for _, peerId := range peerIdsToRemove {
for i, s := range self.subscribers { for i, s := range sub.subscribers {
if s.peer == peerId { if s.peer == peerId {
l := len(self.subscribers) - 1 l := len(sub.subscribers) - 1
self.subscribers[l], self.subscribers[i] = self.subscribers[i], self.subscribers[l] sub.subscribers[l], sub.subscribers[i] = sub.subscribers[i], sub.subscribers[l]
self.subscribers = self.subscribers[:l] sub.subscribers = sub.subscribers[:l]
break break
} }
} }

View File

@ -3,22 +3,42 @@ package rpc
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"sync"
"github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
) )
type FilterService struct { type FilterService struct {
node *node.WakuNode node *node.WakuNode
messages map[string][]*pb.WakuMessage
messagesMutex sync.RWMutex
ch chan *protocol.Envelope
quit chan bool
} }
type FilterContentFilterArgs struct { type FilterContentArgs struct {
Topic string `json:"topic,omitempty"` Topic string `json:"topic,omitempty"`
ContentFilters []pb.ContentFilter `json:"contentFilters,omitempty"` ContentFilters []pb.ContentFilter `json:"contentFilters,omitempty"`
} }
func makeContentFilter(args *FilterContentFilterArgs) filter.ContentFilter { type ContentTopicArgs struct {
ContentTopic string `json:"contentTopic,omitempty"`
}
func NewFilterService(node *node.WakuNode) *FilterService {
return &FilterService{
node: node,
messages: make(map[string][]*pb.WakuMessage),
quit: make(chan bool),
}
}
func makeContentFilter(args *FilterContentArgs) filter.ContentFilter {
var contentTopics []string var contentTopics []string
for _, contentFilter := range args.ContentFilters { for _, contentFilter := range args.ContentFilters {
contentTopics = append(contentTopics, contentFilter.ContentTopic) contentTopics = append(contentTopics, contentFilter.ContentTopic)
@ -30,7 +50,39 @@ func makeContentFilter(args *FilterContentFilterArgs) filter.ContentFilter {
} }
} }
func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { func (f *FilterService) addEnvelope(envelope *protocol.Envelope) {
f.messagesMutex.Lock()
defer f.messagesMutex.Unlock()
contentTopic := envelope.Message().ContentTopic
if _, ok := f.messages[contentTopic]; !ok {
return
}
f.messages[contentTopic] = append(f.messages[contentTopic], envelope.Message())
}
func (f *FilterService) Start() {
f.ch = make(chan *protocol.Envelope, 1024)
f.node.Broadcaster().Register(f.ch)
for {
select {
case <-f.quit:
return
case envelope := <-f.ch:
f.addEnvelope(envelope)
}
}
}
func (f *FilterService) Stop() {
f.quit <- true
f.node.Broadcaster().Unregister(f.ch)
close(f.ch)
}
func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error {
_, _, err := f.node.Filter().Subscribe( _, _, err := f.node.Filter().Subscribe(
req.Context(), req.Context(),
makeContentFilter(args), makeContentFilter(args),
@ -42,11 +94,14 @@ func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterConten
reply.Error = err.Error() reply.Error = err.Error()
return nil return nil
} }
for _, contentFilter := range args.ContentFilters {
f.messages[contentFilter.ContentTopic] = make([]*pb.WakuMessage, 0)
}
reply.Success = true reply.Success = true
return nil return nil
} }
func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error {
err := f.node.Filter().UnsubscribeFilter( err := f.node.Filter().UnsubscribeFilter(
req.Context(), req.Context(),
makeContentFilter(args), makeContentFilter(args),
@ -57,10 +112,23 @@ func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterCont
reply.Error = err.Error() reply.Error = err.Error()
return nil return nil
} }
for _, contentFilter := range args.ContentFilters {
delete(f.messages, contentFilter.ContentTopic)
}
reply.Success = true reply.Success = true
return nil return nil
} }
func (f *FilterService) GetV1Messages(req *http.Request, args *Empty, reply *Empty) error { func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs, reply *MessagesReply) error {
return fmt.Errorf("not implemented") f.messagesMutex.Lock()
defer f.messagesMutex.Unlock()
if _, ok := f.messages[args.ContentTopic]; !ok {
return fmt.Errorf("topic %s not subscribed", args.ContentTopic)
}
reply.Messages = f.messages[args.ContentTopic]
f.messages[args.ContentTopic] = make([]*pb.WakuMessage, 0)
return nil
} }

View File

@ -5,6 +5,7 @@ import (
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/tests"
@ -27,7 +28,7 @@ func makeFilterService(t *testing.T) *FilterService {
_, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic)
require.NoError(t, err) require.NoError(t, err)
return &FilterService{n} return NewFilterService(n)
} }
func TestFilterSubscription(t *testing.T) { func TestFilterSubscription(t *testing.T) {
@ -60,7 +61,7 @@ func TestFilterSubscription(t *testing.T) {
_, err = d.node.AddPeer(addr, filter.FilterID_v20beta1) _, err = d.node.AddPeer(addr, filter.FilterID_v20beta1)
require.NoError(t, err) require.NoError(t, err)
args := &FilterContentFilterArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}} args := &FilterContentArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}}
var reply SuccessReply var reply SuccessReply
err = d.PostV1Subscription( err = d.PostV1Subscription(
@ -79,3 +80,66 @@ func TestFilterSubscription(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, reply.Success) require.True(t, reply.Success)
} }
func TestFilterGetV1Messages(t *testing.T) {
serviceA := makeFilterService(t)
var reply SuccessReply
serviceB := makeFilterService(t)
go serviceB.Start()
defer serviceB.Stop()
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty()))
require.NoError(t, err)
var addr multiaddr.Multiaddr
for _, a := range serviceB.node.Host().Addrs() {
addr = a.Encapsulate(hostInfo)
break
}
err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr)
require.NoError(t, err)
// Wait for the dial to complete
time.Sleep(1 * time.Second)
args := &FilterContentArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}}
err = serviceB.PostV1Subscription(
makeRequest(t),
args,
&reply,
)
require.NoError(t, err)
require.True(t, reply.Success)
// Wait for the subscription to be started
time.Sleep(1 * time.Second)
_, err = serviceA.node.Relay().Publish(
context.Background(),
&pb.WakuMessage{ContentTopic: "ct"},
(*relay.Topic)(&testTopic),
)
require.NoError(t, err)
require.True(t, reply.Success)
// Wait for the message to be received
time.Sleep(1 * time.Second)
var messagesReply MessagesReply
err = serviceB.GetV1Messages(
makeRequest(t),
&ContentTopicArgs{"ct"},
&messagesReply,
)
require.NoError(t, err)
require.Len(t, messagesReply.Messages, 1)
err = serviceB.GetV1Messages(
makeRequest(t),
&ContentTopicArgs{"ct"},
&messagesReply,
)
require.NoError(t, err)
require.Len(t, messagesReply.Messages, 0)
}

View File

@ -53,18 +53,18 @@ func (p *PrivateService) GetV1AsymmetricKeypair(req *http.Request, args *Empty,
return nil return nil
} }
func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *Empty, reply *SuccessReply) error {
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }
func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *Empty, reply *SuccessReply) error {
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }
func (p *PrivateService) GetV1SymmetricMessages(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { func (p *PrivateService) GetV1SymmetricMessages(req *http.Request, args *Empty, reply *SuccessReply) error {
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }
func (p *PrivateService) GetV1AsymmetricMessages(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { func (p *PrivateService) GetV1AsymmetricMessages(req *http.Request, args *Empty, reply *SuccessReply) error {
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }

View File

@ -33,10 +33,6 @@ type TopicArgs struct {
Topic string `json:"topic,omitempty"` Topic string `json:"topic,omitempty"`
} }
type MessagesReply struct {
Messages []*pb.WakuMessage `json:"messages,omitempty"`
}
func NewRelayService(node *node.WakuNode) *RelayService { func NewRelayService(node *node.WakuNode) *RelayService {
return &RelayService{ return &RelayService{
node: node, node: node,

View File

@ -1,5 +1,7 @@
package rpc package rpc
import "github.com/status-im/go-waku/waku/v2/protocol/pb"
type SuccessReply struct { type SuccessReply struct {
Success bool `json:"success,omitempty"` Success bool `json:"success,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
@ -7,3 +9,7 @@ type SuccessReply struct {
type Empty struct { type Empty struct {
} }
type MessagesReply struct {
Messages []*pb.WakuMessage `json:"messages,omitempty"`
}

View File

@ -17,7 +17,8 @@ type WakuRpc struct {
node *node.WakuNode node *node.WakuNode
server *http.Server server *http.Server
relayService *RelayService relayService *RelayService
filterService *FilterService
} }
func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc {
@ -31,7 +32,6 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc {
} }
relayService := NewRelayService(node) relayService := NewRelayService(node)
err = s.RegisterService(relayService, "Relay") err = s.RegisterService(relayService, "Relay")
if err != nil { if err != nil {
log.Error(err) log.Error(err)
@ -47,7 +47,8 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc {
log.Error(err) log.Error(err)
} }
err = s.RegisterService(&FilterService{node}, "Filter") filterService := NewFilterService(node)
err = s.RegisterService(filterService, "Filter")
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
@ -71,20 +72,29 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc {
Handler: mux, Handler: mux,
} }
server.RegisterOnShutdown(func() {
filterService.Stop()
relayService.Stop()
})
return &WakuRpc{ return &WakuRpc{
node: node, node: node,
server: server, server: server,
relayService: relayService, relayService: relayService,
filterService: filterService,
} }
} }
func (r *WakuRpc) Start() { func (r *WakuRpc) Start() {
go r.relayService.Start() go r.relayService.Start()
defer r.relayService.Stop() go r.filterService.Start()
go func() {
_ = r.server.ListenAndServe()
}()
log.Info("Rpc server started at ", r.server.Addr) log.Info("Rpc server started at ", r.server.Addr)
log.Info("server stopped ", r.server.ListenAndServe())
} }
func (r *WakuRpc) Stop(ctx context.Context) error { func (r *WakuRpc) Stop(ctx context.Context) error {
log.Info("Shutting down rpc server")
return r.server.Shutdown(ctx) return r.server.Shutdown(ctx)
} }