diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 3342dc29..5649a3c9 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -242,7 +242,7 @@ func (wf *WakuFilterLightNode) notify(ctx context.Context, remotePeerID peer.ID, } func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, - reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peer peer.ID) error { + reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peerID peer.ID) error { request := &pb.FilterSubscribeRequest{ RequestId: hex.EncodeToString(requestID), FilterSubscribeType: reqType, @@ -255,11 +255,14 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, return err } - logger := wf.log.With(logging.HostID("peerID", peer)) + logger := wf.log.With(logging.HostID("peerID", peerID)) - stream, err := wf.h.NewStream(ctx, peer, FilterSubscribeID_v20beta1) + stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) + if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } return err } diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 2bf63bb5..3d898f89 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -273,6 +274,9 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge wf.metrics.RecordError(pushTimeoutFailure) } else { wf.metrics.RecordError(dialFailure) + if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } } logger.Error("opening peer stream", zap.Error(err)) return err diff --git a/waku/v2/protocol/legacy_store/waku_store_client.go b/waku/v2/protocol/legacy_store/waku_store_client.go index b02cd92e..456dada5 100644 --- a/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/waku/v2/protocol/legacy_store/waku_store_client.go @@ -207,6 +207,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor if err != nil { logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) + if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + } return nil, err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index de7bce8b..10028fdd 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -188,14 +188,17 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu } // request sends a message via lightPush protocol to either a specified peer or peer that is selected. -func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) { +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peerID peer.ID) (*pb.PushResponse, error) { - logger := wakuLP.log.With(logging.HostID("peer", peer)) + logger := wakuLP.log.With(logging.HostID("peer", peerID)) - stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1) + stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) + if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } return nil, err } pushRequestRPC := &pb.PushRpc{RequestId: hex.EncodeToString(params.requestID), Request: req} diff --git a/waku/v2/protocol/metadata/waku_metadata.go b/waku/v2/protocol/metadata/waku_metadata.go index 23a7e455..dc7c44e5 100644 --- a/waku/v2/protocol/metadata/waku_metadata.go +++ b/waku/v2/protocol/metadata/waku_metadata.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb" @@ -103,6 +104,9 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak stream, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) + if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + } return nil, err } diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 915ce75f..6baf3095 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -76,6 +76,9 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { + if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer}) + } return err } diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 34a08bcf..5cda4eef 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -253,6 +253,9 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) + if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { + ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + } return nil, err }