From 50e0e803ebd020224b87c9949c891bd15c81b272 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 7 Feb 2023 18:26:07 -0400 Subject: [PATCH] fix: concurrent write on relay rpc/rest --- waku/v2/rest/relay.go | 2 ++ waku/v2/rpc/relay.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/waku/v2/rest/relay.go b/waku/v2/rest/relay.go index 24e85574..9c73796b 100644 --- a/waku/v2/rest/relay.go +++ b/waku/v2/rest/relay.go @@ -71,11 +71,13 @@ func (r *RelayService) Start(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) r.cancel = cancel + r.messagesMutex.Lock() // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these for _, topic := range r.node.Relay().Topics() { r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) r.messages[topic] = []*pb.WakuMessage{} } + r.messagesMutex.Unlock() r.runner.Start(ctx) } diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 46e4c527..92619ffe 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -67,11 +67,13 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { } func (r *RelayService) Start() { + r.messagesMutex.Lock() // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these for _, topic := range r.node.Relay().Topics() { r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) r.messages[topic] = make([]*pb.WakuMessage, 0) } + r.messagesMutex.Unlock() r.runner.Start() }