diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 9d09c817..d4aa3384 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -389,52 +389,54 @@ func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subsc node.bcaster.Register(subscription.C) - go func(t relay.Topic) { - nextMsgTicker := time.NewTicker(time.Millisecond * 10) - defer nextMsgTicker.Stop() - - ctx, err := tag.New(node.ctx, tag.Insert(metrics.KeyType, "relay")) - if err != nil { - log.Error(err) - return - } - - for { - select { - case <-subscription.quit: - subscription.mutex.Lock() - node.bcaster.Unregister(subscription.C) // Remove from broadcast list - close(subscription.C) - subscription.mutex.Unlock() - case <-nextMsgTicker.C: - msg, err := sub.Next(ctx) - if err != nil { - subscription.mutex.Lock() - for _, subscription := range node.subscriptions[t] { - subscription.Unsubscribe() - } - subscription.mutex.Unlock() - return - } - - stats.Record(ctx, metrics.Messages.M(1)) - - wakuMessage := &pb.WakuMessage{} - if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { - log.Error("could not decode message", err) - return - } - - envelope := protocol.NewEnvelope(wakuMessage, string(t)) - - node.bcaster.Submit(envelope) - } - } - }(t) + go node.subscribeToTopic(t, subscription, sub) return subscription, nil } +func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription, sub *pubsub.Subscription) { + nextMsgTicker := time.NewTicker(time.Millisecond * 10) + defer nextMsgTicker.Stop() + + ctx, err := tag.New(node.ctx, tag.Insert(metrics.KeyType, "relay")) + if err != nil { + log.Error(err) + return + } + + for { + select { + case <-subscription.quit: + subscription.mutex.Lock() + node.bcaster.Unregister(subscription.C) // Remove from broadcast list + close(subscription.C) + subscription.mutex.Unlock() + case <-nextMsgTicker.C: + msg, err := sub.Next(ctx) + if err != nil { + subscription.mutex.Lock() + for _, subscription := range node.subscriptions[t] { + subscription.Unsubscribe() + } + subscription.mutex.Unlock() + return + } + + stats.Record(ctx, metrics.Messages.M(1)) + + wakuMessage := &pb.WakuMessage{} + if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { + log.Error("could not decode message", err) + return + } + + envelope := protocol.NewEnvelope(wakuMessage, string(t)) + + node.bcaster.Submit(envelope) + } + } +} + // Wrapper around WakuFilter.Subscribe // that adds a Filter object to node.filters // TODO: what's up with this channel?.......................... is it closed eventually?