From 1c75c89790982c6252c499a7576e141e6a59780b Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 24 May 2023 17:06:08 -0400 Subject: [PATCH] refactor: remove unneeded `Connect` as dns4 addresses are being resolved now --- waku/v2/protocol/filter/client.go | 9 +------- waku/v2/protocol/filter/server.go | 13 ----------- waku/v2/protocol/legacy_filter/waku_filter.go | 22 ------------------- waku/v2/protocol/lightpush/waku_lightpush.go | 7 ------ waku/v2/protocol/peer_exchange/client.go | 6 ----- waku/v2/protocol/store/waku_store_client.go | 8 ------- 6 files changed, 1 insertion(+), 64 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 15922d10..24b7b8b6 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -154,14 +154,7 @@ func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, } func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { - err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer)) - if err != nil { - metrics.RecordFilterError(ctx, "dial_failure") - return err - } - - var conn network.Stream - conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) + conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) if err != nil { metrics.RecordFilterError(ctx, "dial_failure") return err diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 9181f3b2..1e8fd50f 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -285,19 +285,6 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e ctx, cancel := context.WithTimeout(ctx, MessagePushTimeout) defer cancel() - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peerID)) - if err != nil { - wf.subscriptions.FlagAsFailure(peerID) - if errors.Is(context.DeadlineExceeded, err) { - metrics.RecordFilterError(ctx, "push_timeout_failure") - } else { - metrics.RecordFilterError(ctx, "dial_failure") - } - logger.Error("connecting to peer", zap.Error(err)) - return err - } - conn, err := wf.h.NewStream(ctx, peerID, FilterPushID_v20beta1) if err != nil { wf.subscriptions.FlagAsFailure(peerID) diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 9ca46b4d..85f9c8c2 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -171,15 +171,6 @@ func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, ms pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*wpb.WakuMessage{msg}}} logger := wf.log.With(logging.HostID("peer", subscriber.peer)) - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(subscriber.peer)) - if err != nil { - wf.subscribers.FlagAsFailure(subscriber.peer) - logger.Error("connecting to peer", zap.Error(err)) - metrics.RecordLegacyFilterError(ctx, "dial_failure") - return err - } - conn, err := wf.h.NewStream(ctx, subscriber.peer, FilterID_v20beta1) if err != nil { wf.subscribers.FlagAsFailure(subscriber.peer) @@ -269,13 +260,6 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) } - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err = wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer)) - if err != nil { - metrics.RecordLegacyFilterError(ctx, "dial_failure") - return - } - request := &pb.FilterRequest{ Subscribe: true, Topic: filter.Topic, @@ -313,12 +297,6 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error { - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peer)) - if err != nil { - metrics.RecordLegacyFilterError(ctx, "dial_failure") - return err - } conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1) if err != nil { diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 385e34e1..85af5d40 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -157,13 +157,6 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o } logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer)) - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := wakuLP.h.Connect(ctx, wakuLP.h.Peerstore().PeerInfo(params.selectedPeer)) - if err != nil { - metrics.RecordLightpushError(ctx, "dial_failure") - logger.Error("connecting peer", zap.Error(err)) - return nil, err - } connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) if err != nil { diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 9f151c97..d5bf9d06 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -38,12 +38,6 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts }, } - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := wakuPX.h.Connect(ctx, wakuPX.h.Peerstore().PeerInfo(params.selectedPeer)) - if err != nil { - return err - } - connOpt, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { return err diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 47357b98..d2c957b4 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -176,14 +176,6 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec logger := store.log.With(logging.HostID("peer", selectedPeer)) logger.Info("querying message history") - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer)) - if err != nil { - logger.Error("connecting to peer", zap.Error(err)) - metrics.RecordStoreError(store.ctx, "dial_failure") - return nil, err - } - connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) if err != nil { logger.Error("creating stream to peer", zap.Error(err))