diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 97f12234..b9e15238 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -173,16 +173,19 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) { if !ok { // Joins topic if node hasn't joined yet err := w.pubsub.RegisterTopicValidator(topic, w.topicValidator(topic)) if err != nil { + w.log.Error("failed to register topic validator", zap.String("pubsubTopic", topic), zap.Error(err)) return nil, err } newTopic, err := w.pubsub.Join(string(topic)) if err != nil { + w.log.Error("failed to join pubsubTopic", zap.String("pubsubTopic", topic), zap.Error(err)) return nil, err } err = newTopic.SetScoreParams(w.topicParams) if err != nil { + w.log.Error("failed to set score params", zap.String("pubsubTopic", topic), zap.Error(err)) return nil, err } @@ -199,11 +202,13 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) { func (w *WakuRelay) subscribeToPubsubTopic(topic string) (*pubsubTopicSubscriptionDetails, error) { w.topicsMutex.Lock() defer w.topicsMutex.Unlock() + w.log.Info("subscribing to underlying pubsubTopic", zap.String("pubsubTopic", topic)) result, ok := w.topics[topic] if !ok { pubSubTopic, err := w.upsertTopic(topic) if err != nil { + w.log.Error("failed to upsert topic", zap.String("pubsubTopic", topic), zap.Error(err)) return nil, err } @@ -330,6 +335,7 @@ func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTo func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) { pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) if err != nil { + w.log.Error("failed to derive pubsubTopic", zap.Error(err), zap.String("contentTopic", contentTopic)) return nil, err } contentFilter := waku_proto.NewContentFilter(pubsubTopic, contentTopic) @@ -371,6 +377,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont for _, opt := range optList { err := opt(params) if err != nil { + w.log.Error("failed to apply option", zap.Error(err)) return nil, err } } @@ -389,6 +396,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont _, err := w.subscribeToPubsubTopic(cFilter.PubsubTopic) if err != nil { //TODO: Handle partial errors. + w.log.Error("failed to subscribe to pubsubTopic", zap.Error(err), zap.String("pubsubTopic", cFilter.PubsubTopic)) return nil, err } } @@ -425,6 +433,8 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter) if err != nil { + w.log.Error("failed to derive pubsubTopic from contentFilter", zap.String("pubsubTopic", contentFilter.PubsubTopic), + zap.Strings("contentTopics", contentFilter.ContentTopicsList())) return err } @@ -436,6 +446,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co pubsubUnsubscribe := false sub, ok := w.topics[pubSubTopic] if !ok { + w.log.Error("not subscribed to topic", zap.String("topic", pubSubTopic)) return errors.New("not subscribed to topic") } @@ -475,7 +486,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptionDetails) error { pubSubTopic := topicData.subscription.Topic() - w.log.Info("unsubscribing from topic", zap.String("topic", pubSubTopic)) + w.log.Info("unsubscribing from pubsubTopic", zap.String("topic", pubSubTopic)) topicData.subscription.Cancel() topicData.topicEventHandler.Cancel() @@ -484,11 +495,18 @@ func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptio err := topicData.topic.Close() if err != nil { + w.log.Error("failed to close the pubsubTopic", zap.String("topic", pubSubTopic)) return err } w.RemoveTopicValidator(pubSubTopic) + err = w.pubsub.UnregisterTopicValidator(pubSubTopic) + if err != nil { + w.log.Error("failed to unregister topic validator", zap.String("topic", pubSubTopic)) + return err + } + delete(w.topics, pubSubTopic) return w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{pubSubTopic})