diff --git a/waku/node.go b/waku/node.go index fe95973b..43963883 100644 --- a/waku/node.go +++ b/waku/node.go @@ -273,7 +273,7 @@ func Execute(options Options) { var rpcServer *rpc.WakuRpc if options.RPCServer.Enable { rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port) - go rpcServer.Start() + rpcServer.Start() } // Wait for a SIGINT or SIGTERM signal diff --git a/waku/v2/protocol/filter/filter_subscribers.go b/waku/v2/protocol/filter/filter_subscribers.go index 7703e736..3b5c2b9e 100644 --- a/waku/v2/protocol/filter/filter_subscribers.go +++ b/waku/v2/protocol/filter/filter_subscribers.go @@ -22,21 +22,21 @@ func NewSubscribers() *Subscribers { return &Subscribers{} } -func (self *Subscribers) Append(s Subscriber) int { - self.Lock() - defer self.Unlock() +func (sub *Subscribers) Append(s Subscriber) int { + sub.Lock() + defer sub.Unlock() - self.subscribers = append(self.subscribers, s) - return len(self.subscribers) + sub.subscribers = append(sub.subscribers, s) + return len(sub.subscribers) } -func (self *Subscribers) Items() <-chan Subscriber { +func (sub *Subscribers) Items() <-chan Subscriber { c := make(chan Subscriber) f := func() { - self.RLock() - defer self.RUnlock() - for _, value := range self.subscribers { + sub.RLock() + defer sub.RUnlock() + for _, value := range sub.subscribers { c <- value } close(c) @@ -46,17 +46,17 @@ func (self *Subscribers) Items() <-chan Subscriber { return c } -func (self *Subscribers) Length() int { - self.RLock() - defer self.RUnlock() +func (sub *Subscribers) Length() int { + sub.RLock() + defer sub.RUnlock() - return len(self.subscribers) + return len(sub.subscribers) } -func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) { +func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) { var peerIdsToRemove []peer.ID - for _, subscriber := range self.subscribers { + for _, subscriber := range sub.subscribers { if subscriber.peer != peerID { continue } @@ -82,11 +82,11 @@ func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []* // make sure we delete the subscriber // if no more content filters left for _, peerId := range peerIdsToRemove { - for i, s := range self.subscribers { + for i, s := range sub.subscribers { if s.peer == peerId { - l := len(self.subscribers) - 1 - self.subscribers[l], self.subscribers[i] = self.subscribers[i], self.subscribers[l] - self.subscribers = self.subscribers[:l] + l := len(sub.subscribers) - 1 + sub.subscribers[l], sub.subscribers[i] = sub.subscribers[i], sub.subscribers[l] + sub.subscribers = sub.subscribers[:l] break } } diff --git a/waku/v2/rpc/filter.go b/waku/v2/rpc/filter.go index 1470aaeb..639622a8 100644 --- a/waku/v2/rpc/filter.go +++ b/waku/v2/rpc/filter.go @@ -3,22 +3,42 @@ package rpc import ( "fmt" "net/http" + "sync" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/pb" ) type FilterService struct { node *node.WakuNode + + messages map[string][]*pb.WakuMessage + messagesMutex sync.RWMutex + + ch chan *protocol.Envelope + quit chan bool } -type FilterContentFilterArgs struct { +type FilterContentArgs struct { Topic string `json:"topic,omitempty"` ContentFilters []pb.ContentFilter `json:"contentFilters,omitempty"` } -func makeContentFilter(args *FilterContentFilterArgs) filter.ContentFilter { +type ContentTopicArgs struct { + ContentTopic string `json:"contentTopic,omitempty"` +} + +func NewFilterService(node *node.WakuNode) *FilterService { + return &FilterService{ + node: node, + messages: make(map[string][]*pb.WakuMessage), + quit: make(chan bool), + } +} + +func makeContentFilter(args *FilterContentArgs) filter.ContentFilter { var contentTopics []string for _, contentFilter := range args.ContentFilters { contentTopics = append(contentTopics, contentFilter.ContentTopic) @@ -30,7 +50,39 @@ func makeContentFilter(args *FilterContentFilterArgs) filter.ContentFilter { } } -func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { +func (f *FilterService) addEnvelope(envelope *protocol.Envelope) { + f.messagesMutex.Lock() + defer f.messagesMutex.Unlock() + + contentTopic := envelope.Message().ContentTopic + if _, ok := f.messages[contentTopic]; !ok { + return + } + + f.messages[contentTopic] = append(f.messages[contentTopic], envelope.Message()) +} + +func (f *FilterService) Start() { + f.ch = make(chan *protocol.Envelope, 1024) + f.node.Broadcaster().Register(f.ch) + + for { + select { + case <-f.quit: + return + case envelope := <-f.ch: + f.addEnvelope(envelope) + } + } +} + +func (f *FilterService) Stop() { + f.quit <- true + f.node.Broadcaster().Unregister(f.ch) + close(f.ch) +} + +func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error { _, _, err := f.node.Filter().Subscribe( req.Context(), makeContentFilter(args), @@ -42,11 +94,14 @@ func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterConten reply.Error = err.Error() return nil } + for _, contentFilter := range args.ContentFilters { + f.messages[contentFilter.ContentTopic] = make([]*pb.WakuMessage, 0) + } reply.Success = true return nil } -func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { +func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error { err := f.node.Filter().UnsubscribeFilter( req.Context(), makeContentFilter(args), @@ -57,10 +112,23 @@ func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterCont reply.Error = err.Error() return nil } + for _, contentFilter := range args.ContentFilters { + delete(f.messages, contentFilter.ContentTopic) + } + reply.Success = true return nil } -func (f *FilterService) GetV1Messages(req *http.Request, args *Empty, reply *Empty) error { - return fmt.Errorf("not implemented") +func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs, reply *MessagesReply) error { + f.messagesMutex.Lock() + defer f.messagesMutex.Unlock() + + if _, ok := f.messages[args.ContentTopic]; !ok { + return fmt.Errorf("topic %s not subscribed", args.ContentTopic) + } + + reply.Messages = f.messages[args.ContentTopic] + f.messages[args.ContentTopic] = make([]*pb.WakuMessage, 0) + return nil } diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 5fac092c..f5f0b3c9 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "fmt" "testing" + "time" "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/tests" @@ -27,7 +28,7 @@ func makeFilterService(t *testing.T) *FilterService { _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - return &FilterService{n} + return NewFilterService(n) } func TestFilterSubscription(t *testing.T) { @@ -60,7 +61,7 @@ func TestFilterSubscription(t *testing.T) { _, err = d.node.AddPeer(addr, filter.FilterID_v20beta1) require.NoError(t, err) - args := &FilterContentFilterArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}} + args := &FilterContentArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}} var reply SuccessReply err = d.PostV1Subscription( @@ -79,3 +80,66 @@ func TestFilterSubscription(t *testing.T) { require.NoError(t, err) require.True(t, reply.Success) } + +func TestFilterGetV1Messages(t *testing.T) { + serviceA := makeFilterService(t) + var reply SuccessReply + + serviceB := makeFilterService(t) + go serviceB.Start() + defer serviceB.Stop() + + hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) + require.NoError(t, err) + + var addr multiaddr.Multiaddr + for _, a := range serviceB.node.Host().Addrs() { + addr = a.Encapsulate(hostInfo) + break + } + err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) + require.NoError(t, err) + + // Wait for the dial to complete + time.Sleep(1 * time.Second) + + args := &FilterContentArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}} + err = serviceB.PostV1Subscription( + makeRequest(t), + args, + &reply, + ) + require.NoError(t, err) + require.True(t, reply.Success) + + // Wait for the subscription to be started + time.Sleep(1 * time.Second) + + _, err = serviceA.node.Relay().Publish( + context.Background(), + &pb.WakuMessage{ContentTopic: "ct"}, + (*relay.Topic)(&testTopic), + ) + require.NoError(t, err) + require.True(t, reply.Success) + + // Wait for the message to be received + time.Sleep(1 * time.Second) + + var messagesReply MessagesReply + err = serviceB.GetV1Messages( + makeRequest(t), + &ContentTopicArgs{"ct"}, + &messagesReply, + ) + require.NoError(t, err) + require.Len(t, messagesReply.Messages, 1) + + err = serviceB.GetV1Messages( + makeRequest(t), + &ContentTopicArgs{"ct"}, + &messagesReply, + ) + require.NoError(t, err) + require.Len(t, messagesReply.Messages, 0) +} diff --git a/waku/v2/rpc/private.go b/waku/v2/rpc/private.go index 8189b672..839267ca 100644 --- a/waku/v2/rpc/private.go +++ b/waku/v2/rpc/private.go @@ -53,18 +53,18 @@ func (p *PrivateService) GetV1AsymmetricKeypair(req *http.Request, args *Empty, return nil } -func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { +func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *Empty, reply *SuccessReply) error { return fmt.Errorf("not implemented") } -func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { +func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *Empty, reply *SuccessReply) error { return fmt.Errorf("not implemented") } -func (p *PrivateService) GetV1SymmetricMessages(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { +func (p *PrivateService) GetV1SymmetricMessages(req *http.Request, args *Empty, reply *SuccessReply) error { return fmt.Errorf("not implemented") } -func (p *PrivateService) GetV1AsymmetricMessages(req *http.Request, args *FilterContentFilterArgs, reply *SuccessReply) error { +func (p *PrivateService) GetV1AsymmetricMessages(req *http.Request, args *Empty, reply *SuccessReply) error { return fmt.Errorf("not implemented") } diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 927f7684..25cf65e2 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -33,10 +33,6 @@ type TopicArgs struct { Topic string `json:"topic,omitempty"` } -type MessagesReply struct { - Messages []*pb.WakuMessage `json:"messages,omitempty"` -} - func NewRelayService(node *node.WakuNode) *RelayService { return &RelayService{ node: node, diff --git a/waku/v2/rpc/rpc_type.go b/waku/v2/rpc/rpc_type.go index 82576d18..391bee20 100644 --- a/waku/v2/rpc/rpc_type.go +++ b/waku/v2/rpc/rpc_type.go @@ -1,5 +1,7 @@ package rpc +import "github.com/status-im/go-waku/waku/v2/protocol/pb" + type SuccessReply struct { Success bool `json:"success,omitempty"` Error string `json:"error,omitempty"` @@ -7,3 +9,7 @@ type SuccessReply struct { type Empty struct { } + +type MessagesReply struct { + Messages []*pb.WakuMessage `json:"messages,omitempty"` +} diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index d433b4e3..5c1b82ad 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -17,7 +17,8 @@ type WakuRpc struct { node *node.WakuNode server *http.Server - relayService *RelayService + relayService *RelayService + filterService *FilterService } func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { @@ -31,7 +32,6 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { } relayService := NewRelayService(node) - err = s.RegisterService(relayService, "Relay") if err != nil { log.Error(err) @@ -47,7 +47,8 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { log.Error(err) } - err = s.RegisterService(&FilterService{node}, "Filter") + filterService := NewFilterService(node) + err = s.RegisterService(filterService, "Filter") if err != nil { log.Error(err) } @@ -71,20 +72,29 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { Handler: mux, } + server.RegisterOnShutdown(func() { + filterService.Stop() + relayService.Stop() + }) + return &WakuRpc{ - node: node, - server: server, - relayService: relayService, + node: node, + server: server, + relayService: relayService, + filterService: filterService, } } func (r *WakuRpc) Start() { go r.relayService.Start() - defer r.relayService.Stop() + go r.filterService.Start() + go func() { + _ = r.server.ListenAndServe() + }() log.Info("Rpc server started at ", r.server.Addr) - log.Info("server stopped ", r.server.ListenAndServe()) } func (r *WakuRpc) Stop(ctx context.Context) error { + log.Info("Shutting down rpc server") return r.server.Shutdown(ctx) }