From 8916615ba3d6b1c0262812455f789f3883d10d3d Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Tue, 17 Sep 2024 17:40:32 -0700 Subject: [PATCH] feat: emit an event in EventBus upon dial error --- waku/v2/node/wakunode2.go | 5 +--- waku/v2/peermanager/peer_connector.go | 34 ++++++++++++++++++------ waku/v2/peermanager/peer_manager_test.go | 1 + waku/v2/utils/peer.go | 5 ++++ 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 10153fd6..07af6c79 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -751,10 +751,7 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) - if err != nil { - w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID) - return err - } + w.peerConnector.HandleDialError(err, info.ID) for _, addr := range info.Addrs { // TODO: this is a temporary fix diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index bd844b20..e1c071c7 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -19,7 +19,9 @@ import ( "github.com/waku-org/go-waku/waku/v2/onlinechecker" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/libp2p/go-libp2p/core/event" "go.uber.org/zap" lru "github.com/hashicorp/golang-lru" @@ -39,8 +41,9 @@ type PeerConnectionStrategy struct { *service.CommonDiscoveryService subscriptions []subscription - backoff backoff.BackoffFactory - logger *zap.Logger + backoff backoff.BackoffFactory + logger *zap.Logger + evtDialError event.Emitter } type subscription struct { @@ -156,6 +159,11 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) { // Start attempts to connect to the peers passed in by peerCh. // Will not connect to peers if they are within the backoff period. func (c *PeerConnectionStrategy) Start(ctx context.Context) error { + var err error + c.evtDialError, err = c.host.EventBus().Emitter(new(utils.DialError)) + if err != nil { + return err + } return c.CommonDiscoveryService.Start(ctx, c.start) } @@ -171,7 +179,9 @@ func (c *PeerConnectionStrategy) start() error { // Stop terminates the peer-connector func (c *PeerConnectionStrategy) Stop() { - c.CommonDiscoveryService.Stop(func() {}) + c.CommonDiscoveryService.Stop(func() { + c.evtDialError.Close() + }) } func (c *PeerConnectionStrategy) isPaused() bool { @@ -277,11 +287,19 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout) defer cancel() err := c.host.Connect(ctx, pi) - if err != nil && !errors.Is(err, context.Canceled) { - c.addConnectionBackoff(pi.ID) - c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID) - c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) - } + c.HandleDialError(err, pi.ID) c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) <-sem } + +func (c *PeerConnectionStrategy) HandleDialError(err error, peerID peer.ID) { + if err != nil && !errors.Is(err, context.Canceled) { + c.addConnectionBackoff(peerID) + c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID) + c.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err)) + emitterErr := c.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID}) + if emitterErr != nil { + c.logger.Error("failed to emit DialError", zap.Error(emitterErr)) + } + } +} diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 046666ae..12dceef2 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -223,6 +223,7 @@ func TestConnectToRelayPeers(t *testing.T) { ctx, pm, deferFn := initTest(t) pc, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 120*time.Second, pm.logger) require.NoError(t, err) + pc.SetHost(pm.host) err = pc.Start(ctx) require.NoError(t, err) pm.Start(ctx) diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 8321dc3e..b732fa14 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -5,6 +5,11 @@ import ( "github.com/multiformats/go-multiaddr" ) +type DialError struct { + Err error + PeerID peer.ID +} + // GetPeerID is used to extract the peerID from a multiaddress func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)