refactor: runnable service

This commit is contained in:
Anthony Laibe 2021-11-22 15:48:32 +01:00
parent 74c0f648f2
commit 28487873b6
4 changed files with 58 additions and 38 deletions

View File

@ -17,8 +17,7 @@ type FilterService struct {
messages map[string][]*pb.WakuMessage
messagesMutex sync.RWMutex
ch chan *protocol.Envelope
quit chan bool
runner *runnerService
}
type FilterContentArgs struct {
@ -31,11 +30,12 @@ type ContentTopicArgs struct {
}
func NewFilterService(node *node.WakuNode) *FilterService {
return &FilterService{
s := &FilterService{
node: node,
messages: make(map[string][]*pb.WakuMessage),
quit: make(chan bool),
}
s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
return s
}
func makeContentFilter(args *FilterContentArgs) filter.ContentFilter {
@ -63,23 +63,11 @@ func (f *FilterService) addEnvelope(envelope *protocol.Envelope) {
}
func (f *FilterService) Start() {
f.ch = make(chan *protocol.Envelope, 1024)
f.node.Broadcaster().Register(f.ch)
for {
select {
case <-f.quit:
return
case envelope := <-f.ch:
f.addEnvelope(envelope)
}
}
f.runner.Start()
}
func (f *FilterService) Stop() {
f.quit <- true
f.node.Broadcaster().Unregister(f.ch)
close(f.ch)
f.runner.Stop()
}
func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error {

View File

@ -115,10 +115,10 @@ func TestFilterGetV1Messages(t *testing.T) {
// Wait for the subscription to be started
time.Sleep(1 * time.Second)
_, err = serviceA.node.Relay().Publish(
_, err = serviceA.node.Relay().PublishToTopic(
context.Background(),
&pb.WakuMessage{ContentTopic: "ct"},
(*relay.Topic)(&testTopic),
testTopic,
)
require.NoError(t, err)
require.True(t, reply.Success)

View File

@ -16,8 +16,7 @@ type RelayService struct {
messages map[string][]*pb.WakuMessage
messagesMutex sync.RWMutex
ch chan *protocol.Envelope
quit chan bool
runner *runnerService
}
type RelayMessageArgs struct {
@ -34,11 +33,13 @@ type TopicArgs struct {
}
func NewRelayService(node *node.WakuNode) *RelayService {
return &RelayService{
s := &RelayService{
node: node,
messages: make(map[string][]*pb.WakuMessage),
quit: make(chan bool),
}
s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
return s
}
func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
@ -53,23 +54,11 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
}
func (r *RelayService) Start() {
r.ch = make(chan *protocol.Envelope, 1024)
r.node.Broadcaster().Register(r.ch)
for {
select {
case <-r.quit:
return
case envelope := <-r.ch:
r.addEnvelope(envelope)
}
}
r.runner.Start()
}
func (r *RelayService) Stop() {
r.quit <- true
r.node.Broadcaster().Unregister(r.ch)
close(r.ch)
r.runner.Stop()
}
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {

43
waku/v2/rpc/runner.go Normal file
View File

@ -0,0 +1,43 @@
package rpc
import (
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/protocol"
)
type Adder func(msg *protocol.Envelope)
type runnerService struct {
broadcaster v2.Broadcaster
ch chan *protocol.Envelope
quit chan bool
adder Adder
}
func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService {
return &runnerService{
broadcaster: broadcaster,
quit: make(chan bool),
adder: adder,
}
}
func (r *runnerService) Start() {
r.ch = make(chan *protocol.Envelope, 1024)
r.broadcaster.Register(r.ch)
for {
select {
case <-r.quit:
return
case envelope := <-r.ch:
r.adder(envelope)
}
}
}
func (r *runnerService) Stop() {
r.quit <- true
r.broadcaster.Unregister(r.ch)
close(r.ch)
}