diff --git a/waku/v2/rpc/filter.go b/waku/v2/rpc/filter.go index 639622a8..cfafdd7b 100644 --- a/waku/v2/rpc/filter.go +++ b/waku/v2/rpc/filter.go @@ -17,8 +17,7 @@ type FilterService struct { messages map[string][]*pb.WakuMessage messagesMutex sync.RWMutex - ch chan *protocol.Envelope - quit chan bool + runner *runnerService } type FilterContentArgs struct { @@ -31,11 +30,12 @@ type ContentTopicArgs struct { } func NewFilterService(node *node.WakuNode) *FilterService { - return &FilterService{ + s := &FilterService{ node: node, messages: make(map[string][]*pb.WakuMessage), - quit: make(chan bool), } + s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) + return s } func makeContentFilter(args *FilterContentArgs) filter.ContentFilter { @@ -63,23 +63,11 @@ func (f *FilterService) addEnvelope(envelope *protocol.Envelope) { } func (f *FilterService) Start() { - f.ch = make(chan *protocol.Envelope, 1024) - f.node.Broadcaster().Register(f.ch) - - for { - select { - case <-f.quit: - return - case envelope := <-f.ch: - f.addEnvelope(envelope) - } - } + f.runner.Start() } func (f *FilterService) Stop() { - f.quit <- true - f.node.Broadcaster().Unregister(f.ch) - close(f.ch) + f.runner.Stop() } func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error { diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index f5f0b3c9..36f5853f 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -115,10 +115,10 @@ func TestFilterGetV1Messages(t *testing.T) { // Wait for the subscription to be started time.Sleep(1 * time.Second) - _, err = serviceA.node.Relay().Publish( + _, err = serviceA.node.Relay().PublishToTopic( context.Background(), &pb.WakuMessage{ContentTopic: "ct"}, - (*relay.Topic)(&testTopic), + testTopic, ) require.NoError(t, err) require.True(t, reply.Success) diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 25cf65e2..708e91f7 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -16,8 +16,7 @@ type RelayService struct { messages map[string][]*pb.WakuMessage messagesMutex sync.RWMutex - ch chan *protocol.Envelope - quit chan bool + runner *runnerService } type RelayMessageArgs struct { @@ -34,11 +33,13 @@ type TopicArgs struct { } func NewRelayService(node *node.WakuNode) *RelayService { - return &RelayService{ + s := &RelayService{ node: node, messages: make(map[string][]*pb.WakuMessage), - quit: make(chan bool), } + + s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) + return s } func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { @@ -53,23 +54,11 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { } func (r *RelayService) Start() { - r.ch = make(chan *protocol.Envelope, 1024) - r.node.Broadcaster().Register(r.ch) - - for { - select { - case <-r.quit: - return - case envelope := <-r.ch: - r.addEnvelope(envelope) - } - } + r.runner.Start() } func (r *RelayService) Stop() { - r.quit <- true - r.node.Broadcaster().Unregister(r.ch) - close(r.ch) + r.runner.Stop() } func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { diff --git a/waku/v2/rpc/runner.go b/waku/v2/rpc/runner.go new file mode 100644 index 00000000..82f7ebff --- /dev/null +++ b/waku/v2/rpc/runner.go @@ -0,0 +1,43 @@ +package rpc + +import ( + v2 "github.com/status-im/go-waku/waku/v2" + "github.com/status-im/go-waku/waku/v2/protocol" +) + +type Adder func(msg *protocol.Envelope) + +type runnerService struct { + broadcaster v2.Broadcaster + ch chan *protocol.Envelope + quit chan bool + adder Adder +} + +func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService { + return &runnerService{ + broadcaster: broadcaster, + quit: make(chan bool), + adder: adder, + } +} + +func (r *runnerService) Start() { + r.ch = make(chan *protocol.Envelope, 1024) + r.broadcaster.Register(r.ch) + + for { + select { + case <-r.quit: + return + case envelope := <-r.ch: + r.adder(envelope) + } + } +} + +func (r *runnerService) Stop() { + r.quit <- true + r.broadcaster.Unregister(r.ch) + close(r.ch) +}