mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-09 09:23:07 +00:00
feat: handle dial errors from new streams
This commit is contained in:
parent
8916615ba3
commit
435c1b107d
@ -751,7 +751,12 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
|
||||
|
||||
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
|
||||
err := w.host.Connect(ctx, info)
|
||||
w.peerConnector.HandleDialError(err, info.ID)
|
||||
if err != nil {
|
||||
if w.peermanager != nil {
|
||||
w.peermanager.HandleDialError(err, info.ID)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for _, addr := range info.Addrs {
|
||||
// TODO: this is a temporary fix
|
||||
|
||||
@ -4,7 +4,6 @@ package peermanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -19,9 +18,7 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/service"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"go.uber.org/zap"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
@ -41,9 +38,8 @@ type PeerConnectionStrategy struct {
|
||||
*service.CommonDiscoveryService
|
||||
subscriptions []subscription
|
||||
|
||||
backoff backoff.BackoffFactory
|
||||
logger *zap.Logger
|
||||
evtDialError event.Emitter
|
||||
backoff backoff.BackoffFactory
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
type subscription struct {
|
||||
@ -159,11 +155,6 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) {
|
||||
// Start attempts to connect to the peers passed in by peerCh.
|
||||
// Will not connect to peers if they are within the backoff period.
|
||||
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
|
||||
var err error
|
||||
c.evtDialError, err = c.host.EventBus().Emitter(new(utils.DialError))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.CommonDiscoveryService.Start(ctx, c.start)
|
||||
|
||||
}
|
||||
@ -179,9 +170,7 @@ func (c *PeerConnectionStrategy) start() error {
|
||||
|
||||
// Stop terminates the peer-connector
|
||||
func (c *PeerConnectionStrategy) Stop() {
|
||||
c.CommonDiscoveryService.Stop(func() {
|
||||
c.evtDialError.Close()
|
||||
})
|
||||
c.CommonDiscoveryService.Stop(func() {})
|
||||
}
|
||||
|
||||
func (c *PeerConnectionStrategy) isPaused() bool {
|
||||
@ -287,19 +276,10 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
|
||||
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
|
||||
defer cancel()
|
||||
err := c.host.Connect(ctx, pi)
|
||||
c.HandleDialError(err, pi.ID)
|
||||
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
|
||||
if err != nil {
|
||||
c.pm.HandleDialError(err, pi.ID)
|
||||
} else {
|
||||
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
|
||||
}
|
||||
<-sem
|
||||
}
|
||||
|
||||
func (c *PeerConnectionStrategy) HandleDialError(err error, peerID peer.ID) {
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
c.addConnectionBackoff(peerID)
|
||||
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID)
|
||||
c.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err))
|
||||
emitterErr := c.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID})
|
||||
if emitterErr != nil {
|
||||
c.logger.Error("failed to emit DialError", zap.Error(emitterErr))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/service"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -87,6 +88,7 @@ type PeerManager struct {
|
||||
TopicHealthNotifCh chan<- TopicHealthStatus
|
||||
rttCache *FastestPeerSelector
|
||||
RelayEnabled bool
|
||||
evtDialError event.Emitter
|
||||
}
|
||||
|
||||
// PeerSelection provides various options based on which Peer is selected from a list of peers.
|
||||
@ -249,6 +251,14 @@ func (pm *PeerManager) Start(ctx context.Context) {
|
||||
go pm.connectivityLoop(ctx)
|
||||
}
|
||||
go pm.peerStoreLoop(ctx)
|
||||
|
||||
if pm.host != nil {
|
||||
var err error
|
||||
pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError))
|
||||
if err != nil {
|
||||
pm.logger.Error("failed to create dial error emitter", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
|
||||
@ -719,3 +729,22 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
|
||||
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
|
||||
pm.serviceSlots.getPeers(proto).add(peerID)
|
||||
}
|
||||
|
||||
func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
|
||||
if err == nil || errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
if pm.peerConnector != nil {
|
||||
pm.peerConnector.addConnectionBackoff(peerID)
|
||||
}
|
||||
if pm.host != nil {
|
||||
pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID)
|
||||
}
|
||||
pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err))
|
||||
if pm.evtDialError != nil {
|
||||
emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID})
|
||||
if emitterErr != nil {
|
||||
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -245,8 +245,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
||||
stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1)
|
||||
if err != nil {
|
||||
wf.metrics.RecordError(dialFailure)
|
||||
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peerID)
|
||||
if wf.pm != nil {
|
||||
wf.pm.HandleDialError(err, peerID)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -14,7 +14,7 @@ import (
|
||||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
@ -38,6 +38,7 @@ type (
|
||||
log *zap.Logger
|
||||
*service.CommonService
|
||||
subscriptions *SubscribersMap
|
||||
pm *peermanager.PeerManager
|
||||
|
||||
maxSubscriptions int
|
||||
}
|
||||
@ -61,6 +62,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
|
||||
wf.maxSubscriptions = params.MaxSubscribers
|
||||
if params.pm != nil {
|
||||
params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField)
|
||||
wf.pm = params.pm
|
||||
}
|
||||
return wf
|
||||
}
|
||||
@ -274,8 +276,8 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
|
||||
wf.metrics.RecordError(pushTimeoutFailure)
|
||||
} else {
|
||||
wf.metrics.RecordError(dialFailure)
|
||||
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peerID)
|
||||
if wf.pm != nil {
|
||||
wf.pm.HandleDialError(err, peerID)
|
||||
}
|
||||
}
|
||||
logger.Error("opening peer stream", zap.Error(err))
|
||||
|
||||
@ -205,10 +205,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor
|
||||
|
||||
stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
store.metrics.RecordError(dialFailure)
|
||||
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(selectedPeer)
|
||||
if store.pm != nil {
|
||||
store.pm.HandleDialError(err, selectedPeer)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -195,10 +195,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
|
||||
|
||||
stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
wakuLP.metrics.RecordError(dialFailure)
|
||||
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peerID)
|
||||
if wakuLP.pm != nil {
|
||||
wakuLP.pm.HandleDialError(err, peerID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -76,8 +76,8 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
||||
|
||||
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
|
||||
if err != nil {
|
||||
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(params.selectedPeer)
|
||||
if wakuPX.pm != nil {
|
||||
wakuPX.pm.HandleDialError(err, params.selectedPeer)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -281,9 +281,8 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
|
||||
|
||||
stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(params.selectedPeer)
|
||||
if s.pm != nil {
|
||||
s.pm.HandleDialError(err, params.selectedPeer)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user