diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 07af6c79..38088001 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -751,7 +751,12 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) - w.peerConnector.HandleDialError(err, info.ID) + if err != nil { + if w.peermanager != nil { + w.peermanager.HandleDialError(err, info.ID) + } + return err + } for _, addr := range info.Addrs { // TODO: this is a temporary fix diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index e1c071c7..ac130ef0 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -4,7 +4,6 @@ package peermanager import ( "context" - "errors" "math/rand" "sync" "sync/atomic" @@ -19,9 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/onlinechecker" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/service" - "github.com/waku-org/go-waku/waku/v2/utils" - "github.com/libp2p/go-libp2p/core/event" "go.uber.org/zap" lru "github.com/hashicorp/golang-lru" @@ -41,9 +38,8 @@ type PeerConnectionStrategy struct { *service.CommonDiscoveryService subscriptions []subscription - backoff backoff.BackoffFactory - logger *zap.Logger - evtDialError event.Emitter + backoff backoff.BackoffFactory + logger *zap.Logger } type subscription struct { @@ -159,11 +155,6 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) { // Start attempts to connect to the peers passed in by peerCh. // Will not connect to peers if they are within the backoff period. func (c *PeerConnectionStrategy) Start(ctx context.Context) error { - var err error - c.evtDialError, err = c.host.EventBus().Emitter(new(utils.DialError)) - if err != nil { - return err - } return c.CommonDiscoveryService.Start(ctx, c.start) } @@ -179,9 +170,7 @@ func (c *PeerConnectionStrategy) start() error { // Stop terminates the peer-connector func (c *PeerConnectionStrategy) Stop() { - c.CommonDiscoveryService.Stop(func() { - c.evtDialError.Close() - }) + c.CommonDiscoveryService.Stop(func() {}) } func (c *PeerConnectionStrategy) isPaused() bool { @@ -287,19 +276,10 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout) defer cancel() err := c.host.Connect(ctx, pi) - c.HandleDialError(err, pi.ID) - c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) + if err != nil { + c.pm.HandleDialError(err, pi.ID) + } else { + c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) + } <-sem } - -func (c *PeerConnectionStrategy) HandleDialError(err error, peerID peer.ID) { - if err != nil && !errors.Is(err, context.Canceled) { - c.addConnectionBackoff(peerID) - c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID) - c.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err)) - emitterErr := c.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID}) - if emitterErr != nil { - c.logger.Error("failed to emit DialError", zap.Error(emitterErr)) - } - } -} diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 2ac489a0..d548923d 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -23,6 +23,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/metadata" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -87,6 +88,7 @@ type PeerManager struct { TopicHealthNotifCh chan<- TopicHealthStatus rttCache *FastestPeerSelector RelayEnabled bool + evtDialError event.Emitter } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -249,6 +251,14 @@ func (pm *PeerManager) Start(ctx context.Context) { go pm.connectivityLoop(ctx) } go pm.peerStoreLoop(ctx) + + if pm.host != nil { + var err error + pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError)) + if err != nil { + pm.logger.Error("failed to create dial error emitter", zap.Error(err)) + } + } } func (pm *PeerManager) peerStoreLoop(ctx context.Context) { @@ -719,3 +729,22 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol pm.serviceSlots.getPeers(proto).add(peerID) } + +func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { + if err == nil || errors.Is(err, context.Canceled) { + return + } + if pm.peerConnector != nil { + pm.peerConnector.addConnectionBackoff(peerID) + } + if pm.host != nil { + pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID) + } + pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err)) + if pm.evtDialError != nil { + emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID}) + if emitterErr != nil { + pm.logger.Error("failed to emit DialError", zap.Error(emitterErr)) + } + } +} diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index a9d2b496..41b49784 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -245,8 +245,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) - if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wf.pm != nil { + wf.pm.HandleDialError(err, peerID) } return err } diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 9a2e25d6..2b17de4b 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -14,7 +14,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -38,6 +38,7 @@ type ( log *zap.Logger *service.CommonService subscriptions *SubscribersMap + pm *peermanager.PeerManager maxSubscriptions int } @@ -61,6 +62,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi wf.maxSubscriptions = params.MaxSubscribers if params.pm != nil { params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField) + wf.pm = params.pm } return wf } @@ -274,8 +276,8 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge wf.metrics.RecordError(pushTimeoutFailure) } else { wf.metrics.RecordError(dialFailure) - if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wf.pm != nil { + wf.pm.HandleDialError(err, peerID) } } logger.Error("opening peer stream", zap.Error(err)) diff --git a/waku/v2/protocol/legacy_store/waku_store_client.go b/waku/v2/protocol/legacy_store/waku_store_client.go index 03f7c9b2..ef971f00 100644 --- a/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/waku/v2/protocol/legacy_store/waku_store_client.go @@ -205,10 +205,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) - if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(selectedPeer) + if store.pm != nil { + store.pm.HandleDialError(err, selectedPeer) } return nil, err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 8200fddf..f0c005a6 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -195,10 +195,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) - if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wakuLP.pm != nil { + wakuLP.pm.HandleDialError(err, peerID) } return nil, err } diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index f901590d..8a9d8e12 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -76,8 +76,8 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { - if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(params.selectedPeer) + if wakuPX.pm != nil { + wakuPX.pm.HandleDialError(err, params.selectedPeer) } return err } diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 3398c4bf..f7427b97 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -281,9 +281,8 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) - if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(params.selectedPeer) + if s.pm != nil { + s.pm.HandleDialError(err, params.selectedPeer) } return nil, err }