From 94ff99d727c4580b6a17ef44dad87b95a71457d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 7 Oct 2024 08:33:08 -0400 Subject: [PATCH] fix_: retry dnsdisc on failure (#5785) --- wakuv2/waku.go | 126 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 88 insertions(+), 38 deletions(-) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 230fd0bf2..5a402aabc 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -121,8 +121,9 @@ type Waku struct { node *node.WakuNode // reference to a libp2p waku node appDB *sql.DB - dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery - dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map + dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery + dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map + dnsDiscAsyncRetrievedSignal chan struct{} // Filter-related filters *common.Filters // Message filters installed with Subscribe function @@ -237,6 +238,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge wg: sync.WaitGroup{}, dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode), dnsAddressCacheLock: &sync.RWMutex{}, + dnsDiscAsyncRetrievedSignal: make(chan struct{}), storeMsgIDs: make(map[gethcommon.Hash]bool), timesource: ts, storeMsgIDsMu: sync.RWMutex{}, @@ -282,7 +284,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge } if cfg.EnableDiscV5 { - bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes) + bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes, false) if err != nil { logger.Error("failed to get bootstrap nodes", zap.Error(err)) return nil, err @@ -370,7 +372,7 @@ func (w *Waku) GetNodeENRString() (string, error) { return w.node.ENR().String(), nil } -func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) ([]*enode.Node, error) { +func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string, useOnlyDnsDiscCache bool) ([]*enode.Node, error) { wg := sync.WaitGroup{} mu := sync.Mutex{} var result []*enode.Node @@ -397,10 +399,11 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) go func(addr string) { defer gocommon.LogOnPanic() defer wg.Done() - if err := w.dnsDiscover(ctx, addr, retrieveENR); err != nil { - mu.Lock() - w.seededBootnodesForDiscV5 = false - mu.Unlock() + if err := w.dnsDiscover(ctx, addr, retrieveENR, useOnlyDnsDiscCache); err != nil { + go func() { + defer gocommon.LogOnPanic() + w.retryDnsDiscoveryWithBackoff(ctx, addr, w.dnsDiscAsyncRetrievedSignal) + }() } }(addrString) } else { @@ -409,17 +412,23 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) if err != nil { return nil, err } + mu.Lock() result = append(result, bootnode) + mu.Unlock() } } wg.Wait() + if len(result) == 0 { + w.seededBootnodesForDiscV5 = false + } + return result, nil } type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) -func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) error { +func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer, useOnlyCache bool) error { w.logger.Info("retrieving nodes", zap.String("enr", enrtreeAddress)) ctx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() @@ -428,7 +437,7 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA defer w.dnsAddressCacheLock.Unlock() discNodes, ok := w.dnsAddressCache[enrtreeAddress] - if !ok { + if !ok && !useOnlyCache { nameserver := w.cfg.Nameserver resolver := w.cfg.Resolver @@ -462,6 +471,36 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA return nil } +func (w *Waku) retryDnsDiscoveryWithBackoff(ctx context.Context, addr string, successChan chan<- struct{}) { + retries := 0 + for { + err := w.dnsDiscover(ctx, addr, func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {}, false) + if err == nil { + select { + case successChan <- struct{}{}: + default: + } + + break + } + + retries++ + backoff := time.Second * time.Duration(math.Exp2(float64(retries))) + if backoff > time.Minute { + backoff = time.Minute + } + + t := time.NewTimer(backoff) + select { + case <-w.ctx.Done(): + t.Stop() + return + case <-t.C: + t.Stop() + } + } +} + func (w *Waku) discoverAndConnectPeers() { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { defer wg.Done() @@ -476,7 +515,7 @@ func (w *Waku) discoverAndConnectPeers() { // Use DNS Discovery go func() { defer gocommon.LogOnPanic() - if err := w.dnsDiscover(w.ctx, addrString, fnApply); err != nil { + if err := w.dnsDiscover(w.ctx, addrString, fnApply, false); err != nil { w.logger.Error("could not obtain dns discovery peers for ClusterConfig.WakuNodes", zap.Error(err), zap.String("dnsDiscURL", addrString)) } }() @@ -1730,43 +1769,54 @@ func (w *Waku) seedBootnodesForDiscV5() { for { select { + case <-w.dnsDiscAsyncRetrievedSignal: + if !canQuery() { + continue + } + + err := w.restartDiscV5(true) + if err != nil { + w.logger.Warn("failed to restart discv5", zap.Error(err)) + } + retries = 0 + lastTry = now() case <-ticker.C: if w.seededBootnodesForDiscV5 && len(w.node.Host().Network().Peers()) > 3 { w.logger.Debug("not querying bootnodes", zap.Bool("seeded", w.seededBootnodesForDiscV5), zap.Int("peer-count", len(w.node.Host().Network().Peers()))) continue } - if canQuery() { - w.logger.Info("querying bootnodes to restore connectivity", zap.Int("peer-count", len(w.node.Host().Network().Peers()))) - err := w.restartDiscV5() - if err != nil { - w.logger.Warn("failed to restart discv5", zap.Error(err)) - } - lastTry = now() - retries++ - // We reset the retries after a while and restart - if retries > bootnodesMaxRetries { - retries = 0 - } - - } else { + if !canQuery() { w.logger.Info("can't query bootnodes", zap.Int("peer-count", len(w.node.Host().Network().Peers())), zap.Int64("lastTry", lastTry), zap.Int64("now", now()), zap.Int64("backoff", bootnodesQueryBackoffMs*int64(math.Exp2(float64(retries)))), zap.Int("retries", retries)) - + continue } + + w.logger.Info("querying bootnodes to restore connectivity", zap.Int("peer-count", len(w.node.Host().Network().Peers()))) + err := w.restartDiscV5(false) + if err != nil { + w.logger.Warn("failed to restart discv5", zap.Error(err)) + } + + lastTry = now() + retries++ + // We reset the retries after a while and restart + if retries > bootnodesMaxRetries { + retries = 0 + } + // If we go online, trigger immediately case <-w.goingOnline: - if w.cfg.EnableDiscV5 { - if canQuery() { - err := w.restartDiscV5() - if err != nil { - w.logger.Warn("failed to restart discv5", zap.Error(err)) - } - - } - retries = 0 - lastTry = now() + if !canQuery() { + continue } + err := w.restartDiscV5(false) + if err != nil { + w.logger.Warn("failed to restart discv5", zap.Error(err)) + } + retries = 0 + lastTry = now() + case <-w.ctx.Done(): w.logger.Debug("bootnode seeding stopped") return @@ -1775,10 +1825,10 @@ func (w *Waku) seedBootnodesForDiscV5() { } // Restart discv5, re-retrieving bootstrap nodes -func (w *Waku) restartDiscV5() error { +func (w *Waku) restartDiscV5(useOnlyDNSDiscCache bool) error { ctx, cancel := context.WithTimeout(w.ctx, 30*time.Second) defer cancel() - bootnodes, err := w.getDiscV5BootstrapNodes(ctx, w.discV5BootstrapNodes) + bootnodes, err := w.getDiscV5BootstrapNodes(ctx, w.discV5BootstrapNodes, useOnlyDNSDiscCache) if err != nil { return err }