mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-05-19 17:59:31 +00:00
feat: emit an event in EventBus upon dial error
This commit is contained in:
parent
821481fec4
commit
8916615ba3
@ -751,10 +751,7 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
|
||||
|
||||
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
|
||||
err := w.host.Connect(ctx, info)
|
||||
if err != nil {
|
||||
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID)
|
||||
return err
|
||||
}
|
||||
w.peerConnector.HandleDialError(err, info.ID)
|
||||
|
||||
for _, addr := range info.Addrs {
|
||||
// TODO: this is a temporary fix
|
||||
|
||||
@ -19,7 +19,9 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/service"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"go.uber.org/zap"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
@ -39,8 +41,9 @@ type PeerConnectionStrategy struct {
|
||||
*service.CommonDiscoveryService
|
||||
subscriptions []subscription
|
||||
|
||||
backoff backoff.BackoffFactory
|
||||
logger *zap.Logger
|
||||
backoff backoff.BackoffFactory
|
||||
logger *zap.Logger
|
||||
evtDialError event.Emitter
|
||||
}
|
||||
|
||||
type subscription struct {
|
||||
@ -156,6 +159,11 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) {
|
||||
// Start attempts to connect to the peers passed in by peerCh.
|
||||
// Will not connect to peers if they are within the backoff period.
|
||||
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
|
||||
var err error
|
||||
c.evtDialError, err = c.host.EventBus().Emitter(new(utils.DialError))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.CommonDiscoveryService.Start(ctx, c.start)
|
||||
|
||||
}
|
||||
@ -171,7 +179,9 @@ func (c *PeerConnectionStrategy) start() error {
|
||||
|
||||
// Stop terminates the peer-connector
|
||||
func (c *PeerConnectionStrategy) Stop() {
|
||||
c.CommonDiscoveryService.Stop(func() {})
|
||||
c.CommonDiscoveryService.Stop(func() {
|
||||
c.evtDialError.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func (c *PeerConnectionStrategy) isPaused() bool {
|
||||
@ -277,11 +287,19 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
|
||||
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
|
||||
defer cancel()
|
||||
err := c.host.Connect(ctx, pi)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
c.addConnectionBackoff(pi.ID)
|
||||
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID)
|
||||
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
|
||||
}
|
||||
c.HandleDialError(err, pi.ID)
|
||||
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
|
||||
<-sem
|
||||
}
|
||||
|
||||
func (c *PeerConnectionStrategy) HandleDialError(err error, peerID peer.ID) {
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
c.addConnectionBackoff(peerID)
|
||||
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID)
|
||||
c.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err))
|
||||
emitterErr := c.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID})
|
||||
if emitterErr != nil {
|
||||
c.logger.Error("failed to emit DialError", zap.Error(emitterErr))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -223,6 +223,7 @@ func TestConnectToRelayPeers(t *testing.T) {
|
||||
ctx, pm, deferFn := initTest(t)
|
||||
pc, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 120*time.Second, pm.logger)
|
||||
require.NoError(t, err)
|
||||
pc.SetHost(pm.host)
|
||||
err = pc.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
pm.Start(ctx)
|
||||
|
||||
@ -5,6 +5,11 @@ import (
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
type DialError struct {
|
||||
Err error
|
||||
PeerID peer.ID
|
||||
}
|
||||
|
||||
// GetPeerID is used to extract the peerID from a multiaddress
|
||||
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
|
||||
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user