fix_: retry dnsdisc on failure (#5785)

This commit is contained in:
richΛrd 2024-10-07 08:33:08 -04:00 committed by GitHub
parent ccc3e62ce6
commit 94ff99d727
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 88 additions and 38 deletions

View File

@ -121,8 +121,9 @@ type Waku struct {
node *node.WakuNode // reference to a libp2p waku node node *node.WakuNode // reference to a libp2p waku node
appDB *sql.DB appDB *sql.DB
dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery
dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map
dnsDiscAsyncRetrievedSignal chan struct{}
// Filter-related // Filter-related
filters *common.Filters // Message filters installed with Subscribe function filters *common.Filters // Message filters installed with Subscribe function
@ -237,6 +238,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode), dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode),
dnsAddressCacheLock: &sync.RWMutex{}, dnsAddressCacheLock: &sync.RWMutex{},
dnsDiscAsyncRetrievedSignal: make(chan struct{}),
storeMsgIDs: make(map[gethcommon.Hash]bool), storeMsgIDs: make(map[gethcommon.Hash]bool),
timesource: ts, timesource: ts,
storeMsgIDsMu: sync.RWMutex{}, storeMsgIDsMu: sync.RWMutex{},
@ -282,7 +284,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
} }
if cfg.EnableDiscV5 { if cfg.EnableDiscV5 {
bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes) bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes, false)
if err != nil { if err != nil {
logger.Error("failed to get bootstrap nodes", zap.Error(err)) logger.Error("failed to get bootstrap nodes", zap.Error(err))
return nil, err return nil, err
@ -370,7 +372,7 @@ func (w *Waku) GetNodeENRString() (string, error) {
return w.node.ENR().String(), nil return w.node.ENR().String(), nil
} }
func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string, useOnlyDnsDiscCache bool) ([]*enode.Node, error) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
mu := sync.Mutex{} mu := sync.Mutex{}
var result []*enode.Node var result []*enode.Node
@ -397,10 +399,11 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string)
go func(addr string) { go func(addr string) {
defer gocommon.LogOnPanic() defer gocommon.LogOnPanic()
defer wg.Done() defer wg.Done()
if err := w.dnsDiscover(ctx, addr, retrieveENR); err != nil { if err := w.dnsDiscover(ctx, addr, retrieveENR, useOnlyDnsDiscCache); err != nil {
mu.Lock() go func() {
w.seededBootnodesForDiscV5 = false defer gocommon.LogOnPanic()
mu.Unlock() w.retryDnsDiscoveryWithBackoff(ctx, addr, w.dnsDiscAsyncRetrievedSignal)
}()
} }
}(addrString) }(addrString)
} else { } else {
@ -409,17 +412,23 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string)
if err != nil { if err != nil {
return nil, err return nil, err
} }
mu.Lock()
result = append(result, bootnode) result = append(result, bootnode)
mu.Unlock()
} }
} }
wg.Wait() wg.Wait()
if len(result) == 0 {
w.seededBootnodesForDiscV5 = false
}
return result, nil return result, nil
} }
type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup)
func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) error { func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer, useOnlyCache bool) error {
w.logger.Info("retrieving nodes", zap.String("enr", enrtreeAddress)) w.logger.Info("retrieving nodes", zap.String("enr", enrtreeAddress))
ctx, cancel := context.WithTimeout(ctx, requestTimeout) ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel() defer cancel()
@ -428,7 +437,7 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA
defer w.dnsAddressCacheLock.Unlock() defer w.dnsAddressCacheLock.Unlock()
discNodes, ok := w.dnsAddressCache[enrtreeAddress] discNodes, ok := w.dnsAddressCache[enrtreeAddress]
if !ok { if !ok && !useOnlyCache {
nameserver := w.cfg.Nameserver nameserver := w.cfg.Nameserver
resolver := w.cfg.Resolver resolver := w.cfg.Resolver
@ -462,6 +471,36 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA
return nil return nil
} }
func (w *Waku) retryDnsDiscoveryWithBackoff(ctx context.Context, addr string, successChan chan<- struct{}) {
retries := 0
for {
err := w.dnsDiscover(ctx, addr, func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {}, false)
if err == nil {
select {
case successChan <- struct{}{}:
default:
}
break
}
retries++
backoff := time.Second * time.Duration(math.Exp2(float64(retries)))
if backoff > time.Minute {
backoff = time.Minute
}
t := time.NewTimer(backoff)
select {
case <-w.ctx.Done():
t.Stop()
return
case <-t.C:
t.Stop()
}
}
}
func (w *Waku) discoverAndConnectPeers() { func (w *Waku) discoverAndConnectPeers() {
fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
@ -476,7 +515,7 @@ func (w *Waku) discoverAndConnectPeers() {
// Use DNS Discovery // Use DNS Discovery
go func() { go func() {
defer gocommon.LogOnPanic() defer gocommon.LogOnPanic()
if err := w.dnsDiscover(w.ctx, addrString, fnApply); err != nil { if err := w.dnsDiscover(w.ctx, addrString, fnApply, false); err != nil {
w.logger.Error("could not obtain dns discovery peers for ClusterConfig.WakuNodes", zap.Error(err), zap.String("dnsDiscURL", addrString)) w.logger.Error("could not obtain dns discovery peers for ClusterConfig.WakuNodes", zap.Error(err), zap.String("dnsDiscURL", addrString))
} }
}() }()
@ -1730,43 +1769,54 @@ func (w *Waku) seedBootnodesForDiscV5() {
for { for {
select { select {
case <-w.dnsDiscAsyncRetrievedSignal:
if !canQuery() {
continue
}
err := w.restartDiscV5(true)
if err != nil {
w.logger.Warn("failed to restart discv5", zap.Error(err))
}
retries = 0
lastTry = now()
case <-ticker.C: case <-ticker.C:
if w.seededBootnodesForDiscV5 && len(w.node.Host().Network().Peers()) > 3 { if w.seededBootnodesForDiscV5 && len(w.node.Host().Network().Peers()) > 3 {
w.logger.Debug("not querying bootnodes", zap.Bool("seeded", w.seededBootnodesForDiscV5), zap.Int("peer-count", len(w.node.Host().Network().Peers()))) w.logger.Debug("not querying bootnodes", zap.Bool("seeded", w.seededBootnodesForDiscV5), zap.Int("peer-count", len(w.node.Host().Network().Peers())))
continue continue
} }
if canQuery() {
w.logger.Info("querying bootnodes to restore connectivity", zap.Int("peer-count", len(w.node.Host().Network().Peers())))
err := w.restartDiscV5()
if err != nil {
w.logger.Warn("failed to restart discv5", zap.Error(err))
}
lastTry = now() if !canQuery() {
retries++
// We reset the retries after a while and restart
if retries > bootnodesMaxRetries {
retries = 0
}
} else {
w.logger.Info("can't query bootnodes", zap.Int("peer-count", len(w.node.Host().Network().Peers())), zap.Int64("lastTry", lastTry), zap.Int64("now", now()), zap.Int64("backoff", bootnodesQueryBackoffMs*int64(math.Exp2(float64(retries)))), zap.Int("retries", retries)) w.logger.Info("can't query bootnodes", zap.Int("peer-count", len(w.node.Host().Network().Peers())), zap.Int64("lastTry", lastTry), zap.Int64("now", now()), zap.Int64("backoff", bootnodesQueryBackoffMs*int64(math.Exp2(float64(retries)))), zap.Int("retries", retries))
continue
} }
w.logger.Info("querying bootnodes to restore connectivity", zap.Int("peer-count", len(w.node.Host().Network().Peers())))
err := w.restartDiscV5(false)
if err != nil {
w.logger.Warn("failed to restart discv5", zap.Error(err))
}
lastTry = now()
retries++
// We reset the retries after a while and restart
if retries > bootnodesMaxRetries {
retries = 0
}
// If we go online, trigger immediately // If we go online, trigger immediately
case <-w.goingOnline: case <-w.goingOnline:
if w.cfg.EnableDiscV5 { if !canQuery() {
if canQuery() { continue
err := w.restartDiscV5()
if err != nil {
w.logger.Warn("failed to restart discv5", zap.Error(err))
}
}
retries = 0
lastTry = now()
} }
err := w.restartDiscV5(false)
if err != nil {
w.logger.Warn("failed to restart discv5", zap.Error(err))
}
retries = 0
lastTry = now()
case <-w.ctx.Done(): case <-w.ctx.Done():
w.logger.Debug("bootnode seeding stopped") w.logger.Debug("bootnode seeding stopped")
return return
@ -1775,10 +1825,10 @@ func (w *Waku) seedBootnodesForDiscV5() {
} }
// Restart discv5, re-retrieving bootstrap nodes // Restart discv5, re-retrieving bootstrap nodes
func (w *Waku) restartDiscV5() error { func (w *Waku) restartDiscV5(useOnlyDNSDiscCache bool) error {
ctx, cancel := context.WithTimeout(w.ctx, 30*time.Second) ctx, cancel := context.WithTimeout(w.ctx, 30*time.Second)
defer cancel() defer cancel()
bootnodes, err := w.getDiscV5BootstrapNodes(ctx, w.discV5BootstrapNodes) bootnodes, err := w.getDiscV5BootstrapNodes(ctx, w.discV5BootstrapNodes, useOnlyDNSDiscCache)
if err != nil { if err != nil {
return err return err
} }