refactor: remove quit channel from rpc/rest

This commit is contained in:
Richard Ramos 2022-12-10 12:21:22 -04:00 committed by RichΛrd
parent 7366e0d29d
commit b9f47e8982
5 changed files with 24 additions and 16 deletions

View File

@ -370,7 +370,7 @@ func Execute(options Options) {
if options.RESTServer.Enable { if options.RESTServer.Enable {
wg.Add(1) wg.Add(1)
restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.RESTServer.Admin, options.RESTServer.Private, options.RESTServer.RelayCacheCapacity, logger) restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.RESTServer.Admin, options.RESTServer.Private, options.RESTServer.RelayCacheCapacity, logger)
restServer.Start(&wg) restServer.Start(ctx, &wg)
} }
wg.Wait() wg.Wait()

View File

@ -1,6 +1,7 @@
package rest package rest
import ( import (
"context"
"encoding/json" "encoding/json"
"net/http" "net/http"
"strings" "strings"
@ -18,8 +19,9 @@ const ROUTE_RELAY_SUBSCRIPTIONSV1 = "/relay/v1/subscriptions"
const ROUTE_RELAY_MESSAGESV1 = "/relay/v1/messages/{topic}" const ROUTE_RELAY_MESSAGESV1 = "/relay/v1/messages/{topic}"
type RelayService struct { type RelayService struct {
node *node.WakuNode node *node.WakuNode
mux *mux.Router mux *mux.Router
cancel context.CancelFunc
log *zap.Logger log *zap.Logger
@ -65,18 +67,21 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
} }
func (r *RelayService) Start() { func (r *RelayService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() { for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = []*pb.WakuMessage{} r.messages[topic] = []*pb.WakuMessage{}
} }
r.runner.Start() r.runner.Start(ctx)
} }
func (r *RelayService) Stop() { func (r *RelayService) Stop() {
r.runner.Stop() r.cancel()
} }
func (d *RelayService) deleteV1Subscriptions(w http.ResponseWriter, r *http.Request) { func (d *RelayService) deleteV1Subscriptions(w http.ResponseWriter, r *http.Request) {

View File

@ -54,7 +54,7 @@ func TestPostV1Message(t *testing.T) {
func TestRelaySubscription(t *testing.T) { func TestRelaySubscription(t *testing.T) {
d := makeRelayService(t) d := makeRelayService(t)
go d.Start() go d.Start(context.Background())
defer d.Stop() defer d.Stop()
topics := []string{"test"} topics := []string{"test"}
@ -96,10 +96,10 @@ func TestRelaySubscription(t *testing.T) {
func TestRelayGetV1Messages(t *testing.T) { func TestRelayGetV1Messages(t *testing.T) {
serviceA := makeRelayService(t) serviceA := makeRelayService(t)
go serviceA.Start() go serviceA.Start(context.Background())
defer serviceA.Stop() defer serviceA.Stop()
serviceB := makeRelayService(t) serviceB := makeRelayService(t)
go serviceB.Start() go serviceB.Start(context.Background())
defer serviceB.Stop() defer serviceB.Stop()
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty()))

View File

@ -1,6 +1,8 @@
package rest package rest
import ( import (
"context"
v2 "github.com/waku-org/go-waku/waku/v2" v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
) )
@ -10,24 +12,25 @@ type Adder func(msg *protocol.Envelope)
type runnerService struct { type runnerService struct {
broadcaster v2.Broadcaster broadcaster v2.Broadcaster
ch chan *protocol.Envelope ch chan *protocol.Envelope
quit chan bool cancel context.CancelFunc
adder Adder adder Adder
} }
func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService { func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService {
return &runnerService{ return &runnerService{
broadcaster: broadcaster, broadcaster: broadcaster,
quit: make(chan bool),
adder: adder, adder: adder,
} }
} }
func (r *runnerService) Start() { func (r *runnerService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.ch = make(chan *protocol.Envelope, 1024) r.ch = make(chan *protocol.Envelope, 1024)
r.cancel = cancel
r.broadcaster.Register(nil, r.ch) r.broadcaster.Register(nil, r.ch)
for { for {
select { select {
case <-r.quit: case <-ctx.Done():
return return
case envelope := <-r.ch: case envelope := <-r.ch:
r.adder(envelope) r.adder(envelope)
@ -36,7 +39,7 @@ func (r *runnerService) Start() {
} }
func (r *runnerService) Stop() { func (r *runnerService) Stop() {
r.quit <- true r.cancel()
r.broadcaster.Unregister(nil, r.ch) r.broadcaster.Unregister(nil, r.ch)
close(r.ch) close(r.ch)
} }

View File

@ -47,9 +47,9 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enableAdmin bool
return wrpc return wrpc
} }
func (r *WakuRest) Start(wg *sync.WaitGroup) { func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
go r.relayService.Start() go r.relayService.Start(ctx)
go func() { go func() {
_ = r.server.ListenAndServe() _ = r.server.ListenAndServe()
}() }()