refactor: extract function to subscribe to topic

This commit is contained in:
Richard Ramos 2021-10-10 18:52:16 -04:00
parent 13aee0b1e0
commit 578b40a44e
1 changed files with 44 additions and 42 deletions

View File

@ -389,52 +389,54 @@ func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subsc
node.bcaster.Register(subscription.C) node.bcaster.Register(subscription.C)
go func(t relay.Topic) { go node.subscribeToTopic(t, subscription, sub)
nextMsgTicker := time.NewTicker(time.Millisecond * 10)
defer nextMsgTicker.Stop()
ctx, err := tag.New(node.ctx, tag.Insert(metrics.KeyType, "relay"))
if err != nil {
log.Error(err)
return
}
for {
select {
case <-subscription.quit:
subscription.mutex.Lock()
node.bcaster.Unregister(subscription.C) // Remove from broadcast list
close(subscription.C)
subscription.mutex.Unlock()
case <-nextMsgTicker.C:
msg, err := sub.Next(ctx)
if err != nil {
subscription.mutex.Lock()
for _, subscription := range node.subscriptions[t] {
subscription.Unsubscribe()
}
subscription.mutex.Unlock()
return
}
stats.Record(ctx, metrics.Messages.M(1))
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
log.Error("could not decode message", err)
return
}
envelope := protocol.NewEnvelope(wakuMessage, string(t))
node.bcaster.Submit(envelope)
}
}
}(t)
return subscription, nil return subscription, nil
} }
func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription, sub *pubsub.Subscription) {
nextMsgTicker := time.NewTicker(time.Millisecond * 10)
defer nextMsgTicker.Stop()
ctx, err := tag.New(node.ctx, tag.Insert(metrics.KeyType, "relay"))
if err != nil {
log.Error(err)
return
}
for {
select {
case <-subscription.quit:
subscription.mutex.Lock()
node.bcaster.Unregister(subscription.C) // Remove from broadcast list
close(subscription.C)
subscription.mutex.Unlock()
case <-nextMsgTicker.C:
msg, err := sub.Next(ctx)
if err != nil {
subscription.mutex.Lock()
for _, subscription := range node.subscriptions[t] {
subscription.Unsubscribe()
}
subscription.mutex.Unlock()
return
}
stats.Record(ctx, metrics.Messages.M(1))
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
log.Error("could not decode message", err)
return
}
envelope := protocol.NewEnvelope(wakuMessage, string(t))
node.bcaster.Submit(envelope)
}
}
}
// Wrapper around WakuFilter.Subscribe // Wrapper around WakuFilter.Subscribe
// that adds a Filter object to node.filters // that adds a Filter object to node.filters
// TODO: what's up with this channel?.......................... is it closed eventually? // TODO: what's up with this channel?.......................... is it closed eventually?