diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index d80e0073..0da7cc0f 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -91,7 +91,7 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ } else { topicToSubscribe = topic } - _, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) @@ -126,7 +126,16 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { } var response []*pb.WakuMessage select { - case msg := <-sub.Ch: + case msg, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) + w.WriteHeader(http.StatusNotFound) + _, err = w.Write([]byte("consume channel is closed for subscription")) + if err != nil { + r.log.Error("writing response", zap.Error(err)) + } + return + } response = append(response, msg.Message()) default: break diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 2e24c7c1..5c7245b4 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -1,6 +1,7 @@ package rpc import ( + "errors" "fmt" "net/http" @@ -12,6 +13,8 @@ import ( "go.uber.org/zap" ) +var errChannelClosed = errors.New("consume channel is closed for subscription") + // RelayService represents the JSON RPC service for WakuRelay type RelayService struct { node *node.WakuNode @@ -93,7 +96,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, // Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics. func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - _, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...)) + _, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...), relay.WithCacheSize(uint(r.cacheCapacity))) if err != nil { r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err)) return err @@ -150,7 +153,11 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep return err } select { - case msg := <-sub.Ch: + case msg, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic)) + return errChannelClosed + } rpcMsg, err := ProtoToRPC(msg.Message()) if err != nil { r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err)) @@ -165,14 +172,13 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep // PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { - ctx := req.Context() for _, topic := range args.Topics { var err error if topic == "" { topic = relay.DefaultWakuTopic } - _, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topic), relay.WithCacheSize(uint(r.cacheCapacity))) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) return err @@ -207,7 +213,11 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply * return err } select { - case msg := <-sub.Ch: + case msg, open := <-sub.Ch: + if !open { + r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic)) + return errChannelClosed + } m, err := ProtoToRPC(msg.Message()) if err == nil { *reply = append(*reply, m) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 2bbef61a..88726028 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -292,6 +292,9 @@ func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTo cSubs := w.contentSubs[pubsubTopic] for _, sub := range cSubs { if sub.contentFilter.Equals(contentFilter) { + if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned + continue + } return sub, nil } } @@ -308,6 +311,9 @@ func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) cSubs := w.contentSubs[pubsubTopic] for _, sub := range cSubs { if sub.contentFilter.Equals(contentFilter) { + if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned + continue + } return sub, nil } }