From 578b40a44e9457808200806f1d5651662605f71f Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sun, 10 Oct 2021 18:52:16 -0400 Subject: [PATCH] refactor: extract function to subscribe to topic --- waku/v2/node/wakunode2.go | 86 ++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 9d09c817..d4aa3384 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -389,52 +389,54 @@ func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subsc node.bcaster.Register(subscription.C) - go func(t relay.Topic) { - nextMsgTicker := time.NewTicker(time.Millisecond * 10) - defer nextMsgTicker.Stop() - - ctx, err := tag.New(node.ctx, tag.Insert(metrics.KeyType, "relay")) - if err != nil { - log.Error(err) - return - } - - for { - select { - case <-subscription.quit: - subscription.mutex.Lock() - node.bcaster.Unregister(subscription.C) // Remove from broadcast list - close(subscription.C) - subscription.mutex.Unlock() - case <-nextMsgTicker.C: - msg, err := sub.Next(ctx) - if err != nil { - subscription.mutex.Lock() - for _, subscription := range node.subscriptions[t] { - subscription.Unsubscribe() - } - subscription.mutex.Unlock() - return - } - - stats.Record(ctx, metrics.Messages.M(1)) - - wakuMessage := &pb.WakuMessage{} - if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { - log.Error("could not decode message", err) - return - } - - envelope := protocol.NewEnvelope(wakuMessage, string(t)) - - node.bcaster.Submit(envelope) - } - } - }(t) + go node.subscribeToTopic(t, subscription, sub) return subscription, nil } +func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription, sub *pubsub.Subscription) { + nextMsgTicker := time.NewTicker(time.Millisecond * 10) + defer nextMsgTicker.Stop() + + ctx, err := tag.New(node.ctx, tag.Insert(metrics.KeyType, "relay")) + if err != nil { + log.Error(err) + return + } + + for { + select { + case <-subscription.quit: + subscription.mutex.Lock() + node.bcaster.Unregister(subscription.C) // Remove from broadcast list + close(subscription.C) + subscription.mutex.Unlock() + case <-nextMsgTicker.C: + msg, err := sub.Next(ctx) + if err != nil { + subscription.mutex.Lock() + for _, subscription := range node.subscriptions[t] { + subscription.Unsubscribe() + } + subscription.mutex.Unlock() + return + } + + stats.Record(ctx, metrics.Messages.M(1)) + + wakuMessage := &pb.WakuMessage{} + if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { + log.Error("could not decode message", err) + return + } + + envelope := protocol.NewEnvelope(wakuMessage, string(t)) + + node.bcaster.Submit(envelope) + } + } +} + // Wrapper around WakuFilter.Subscribe // that adds a Filter object to node.filters // TODO: what's up with this channel?.......................... is it closed eventually?