fix: relay unsub issue (#924)

co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Prem Chaitanya Prathi 2023-11-24 06:51:37 +05:30 committed by GitHub
parent fb49752f0f
commit b59a498606
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -173,16 +173,19 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
if !ok { // Joins topic if node hasn't joined yet
err := w.pubsub.RegisterTopicValidator(topic, w.topicValidator(topic))
if err != nil {
w.log.Error("failed to register topic validator", zap.String("pubsubTopic", topic), zap.Error(err))
return nil, err
}
newTopic, err := w.pubsub.Join(string(topic))
if err != nil {
w.log.Error("failed to join pubsubTopic", zap.String("pubsubTopic", topic), zap.Error(err))
return nil, err
}
err = newTopic.SetScoreParams(w.topicParams)
if err != nil {
w.log.Error("failed to set score params", zap.String("pubsubTopic", topic), zap.Error(err))
return nil, err
}
@ -199,11 +202,13 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
func (w *WakuRelay) subscribeToPubsubTopic(topic string) (*pubsubTopicSubscriptionDetails, error) {
w.topicsMutex.Lock()
defer w.topicsMutex.Unlock()
w.log.Info("subscribing to underlying pubsubTopic", zap.String("pubsubTopic", topic))
result, ok := w.topics[topic]
if !ok {
pubSubTopic, err := w.upsertTopic(topic)
if err != nil {
w.log.Error("failed to upsert topic", zap.String("pubsubTopic", topic), zap.Error(err))
return nil, err
}
@ -330,6 +335,7 @@ func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTo
func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
w.log.Error("failed to derive pubsubTopic", zap.Error(err), zap.String("contentTopic", contentTopic))
return nil, err
}
contentFilter := waku_proto.NewContentFilter(pubsubTopic, contentTopic)
@ -371,6 +377,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
for _, opt := range optList {
err := opt(params)
if err != nil {
w.log.Error("failed to apply option", zap.Error(err))
return nil, err
}
}
@ -389,6 +396,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
_, err := w.subscribeToPubsubTopic(cFilter.PubsubTopic)
if err != nil {
//TODO: Handle partial errors.
w.log.Error("failed to subscribe to pubsubTopic", zap.Error(err), zap.String("pubsubTopic", cFilter.PubsubTopic))
return nil, err
}
}
@ -425,6 +433,8 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
if err != nil {
w.log.Error("failed to derive pubsubTopic from contentFilter", zap.String("pubsubTopic", contentFilter.PubsubTopic),
zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
return err
}
@ -436,6 +446,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
pubsubUnsubscribe := false
sub, ok := w.topics[pubSubTopic]
if !ok {
w.log.Error("not subscribed to topic", zap.String("topic", pubSubTopic))
return errors.New("not subscribed to topic")
}
@ -475,7 +486,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptionDetails) error {
pubSubTopic := topicData.subscription.Topic()
w.log.Info("unsubscribing from topic", zap.String("topic", pubSubTopic))
w.log.Info("unsubscribing from pubsubTopic", zap.String("topic", pubSubTopic))
topicData.subscription.Cancel()
topicData.topicEventHandler.Cancel()
@ -484,11 +495,18 @@ func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptio
err := topicData.topic.Close()
if err != nil {
w.log.Error("failed to close the pubsubTopic", zap.String("topic", pubSubTopic))
return err
}
w.RemoveTopicValidator(pubSubTopic)
err = w.pubsub.UnregisterTopicValidator(pubSubTopic)
if err != nil {
w.log.Error("failed to unregister topic validator", zap.String("topic", pubSubTopic))
return err
}
delete(w.topics, pubSubTopic)
return w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{pubSubTopic})