diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index c29a2b93..dd7fbae9 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -799,6 +799,17 @@ func (w *WakuNode) ClosePeerByAddress(address string) error { return w.ClosePeerById(info.ID) } +func (w *WakuNode) DisconnectAllPeers() { + w.host.Network().StopNotify(w.connectionNotif) + for _, peerID := range w.host.Network().Peers() { + err := w.ClosePeerById(peerID) + if err != nil { + w.log.Info("failed to close peer", zap.Stringer("peer", peerID), zap.Error(err)) + } + } + w.host.Network().Notify(w.connectionNotif) +} + // ClosePeerById is used to close a connection to a peer func (w *WakuNode) ClosePeerById(id peer.ID) error { err := w.host.Network().ClosePeer(id) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 5649a3c9..52b4efa6 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -147,17 +147,6 @@ func (wf *WakuFilterLightNode) Stop() { }) } -func (wf *WakuFilterLightNode) unsubscribeWithoutSubscription(cf protocol.ContentFilter, peerID peer.ID) { - err := wf.request( - wf.Context(), - protocol.GenerateRequestID(), - pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL, - cf, peerID) - if err != nil { - wf.log.Warn("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) - } -} - func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { peerID := stream.Conn().RemotePeer() @@ -168,8 +157,6 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) wf.metrics.RecordError(unknownPeerMessagePush) //Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us - //This could be happening due to https://github.com/waku-org/go-waku/issues/1124 - go wf.unsubscribeWithoutSubscription(protocol.ContentFilter{}, peerID) if err := stream.Reset(); err != nil { wf.log.Error("resetting connection", zap.Error(err)) } @@ -216,8 +203,6 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea cf := protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic) if !wf.subscriptions.Has(peerID, cf) { logger.Warn("received messagepush with invalid subscription parameters") - //Unsubscribe from that peer for the contentTopic, possibly due to https://github.com/waku-org/go-waku/issues/1124 - go wf.unsubscribeWithoutSubscription(cf, peerID) wf.metrics.RecordError(invalidSubscriptionMessage) return }