diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 10153fd6..38088001 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -752,7 +752,9 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { - w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID) + if w.peermanager != nil { + w.peermanager.HandleDialError(err, info.ID) + } return err } diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index bd844b20..ac130ef0 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -4,7 +4,6 @@ package peermanager import ( "context" - "errors" "math/rand" "sync" "sync/atomic" @@ -277,11 +276,10 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout) defer cancel() err := c.host.Connect(ctx, pi) - if err != nil && !errors.Is(err, context.Canceled) { - c.addConnectionBackoff(pi.ID) - c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID) - c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) + if err != nil { + c.pm.HandleDialError(err, pi.ID) + } else { + c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) } - c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) <-sem } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 2ac489a0..d548923d 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -23,6 +23,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/metadata" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -87,6 +88,7 @@ type PeerManager struct { TopicHealthNotifCh chan<- TopicHealthStatus rttCache *FastestPeerSelector RelayEnabled bool + evtDialError event.Emitter } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -249,6 +251,14 @@ func (pm *PeerManager) Start(ctx context.Context) { go pm.connectivityLoop(ctx) } go pm.peerStoreLoop(ctx) + + if pm.host != nil { + var err error + pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError)) + if err != nil { + pm.logger.Error("failed to create dial error emitter", zap.Error(err)) + } + } } func (pm *PeerManager) peerStoreLoop(ctx context.Context) { @@ -719,3 +729,22 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol pm.serviceSlots.getPeers(proto).add(peerID) } + +func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { + if err == nil || errors.Is(err, context.Canceled) { + return + } + if pm.peerConnector != nil { + pm.peerConnector.addConnectionBackoff(peerID) + } + if pm.host != nil { + pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID) + } + pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err)) + if pm.evtDialError != nil { + emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID}) + if emitterErr != nil { + pm.logger.Error("failed to emit DialError", zap.Error(emitterErr)) + } + } +} diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 046666ae..12dceef2 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -223,6 +223,7 @@ func TestConnectToRelayPeers(t *testing.T) { ctx, pm, deferFn := initTest(t) pc, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 120*time.Second, pm.logger) require.NoError(t, err) + pc.SetHost(pm.host) err = pc.Start(ctx) require.NoError(t, err) pm.Start(ctx) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index a9d2b496..41b49784 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -245,8 +245,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) - if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wf.pm != nil { + wf.pm.HandleDialError(err, peerID) } return err } diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 9a2e25d6..2b17de4b 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -14,7 +14,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -38,6 +38,7 @@ type ( log *zap.Logger *service.CommonService subscriptions *SubscribersMap + pm *peermanager.PeerManager maxSubscriptions int } @@ -61,6 +62,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi wf.maxSubscriptions = params.MaxSubscribers if params.pm != nil { params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField) + wf.pm = params.pm } return wf } @@ -274,8 +276,8 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge wf.metrics.RecordError(pushTimeoutFailure) } else { wf.metrics.RecordError(dialFailure) - if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wf.pm != nil { + wf.pm.HandleDialError(err, peerID) } } logger.Error("opening peer stream", zap.Error(err)) diff --git a/waku/v2/protocol/legacy_store/waku_store_client.go b/waku/v2/protocol/legacy_store/waku_store_client.go index 03f7c9b2..ef971f00 100644 --- a/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/waku/v2/protocol/legacy_store/waku_store_client.go @@ -205,10 +205,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) - if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(selectedPeer) + if store.pm != nil { + store.pm.HandleDialError(err, selectedPeer) } return nil, err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 8200fddf..f0c005a6 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -195,10 +195,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) - if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wakuLP.pm != nil { + wakuLP.pm.HandleDialError(err, peerID) } return nil, err } diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index f901590d..8a9d8e12 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -76,8 +76,8 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { - if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(params.selectedPeer) + if wakuPX.pm != nil { + wakuPX.pm.HandleDialError(err, params.selectedPeer) } return err } diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 3398c4bf..f7427b97 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -281,9 +281,8 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) - if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(params.selectedPeer) + if s.pm != nil { + s.pm.HandleDialError(err, params.selectedPeer) } return nil, err } diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 8321dc3e..b732fa14 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -5,6 +5,11 @@ import ( "github.com/multiformats/go-multiaddr" ) +type DialError struct { + Err error + PeerID peer.ID +} + // GetPeerID is used to extract the peerID from a multiaddress func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)