fix(wakuv2): don't wait for connections to login (#4293)

- Identify will not block logout
- Use `peer.AddrInfo` instead of multiaddresses
- Modifies some logs to reduce noise
This commit is contained in:
richΛrd 2023-11-10 11:31:59 -04:00 committed by GitHub
parent dc2d179692
commit de12ca885c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 36 additions and 47 deletions

View File

@ -423,23 +423,15 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA
func (w *Waku) addWakuV2Peers(ctx context.Context, cfg *Config) error {
fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {
if len(d.PeerInfo.Addrs) != 0 {
go func(ma multiaddr.Multiaddr) {
w.identifyAndConnect(ctx, w.settings.LightClient, ma)
wg.Done()
}(d.PeerInfo.Addrs[0])
go w.identifyAndConnect(ctx, w.settings.LightClient, d.PeerInfo)
}
}
identifyWg := &sync.WaitGroup{}
identifyWg.Add(len(cfg.WakuNodes))
for _, addrString := range cfg.WakuNodes {
addrString := addrString
if strings.HasPrefix(addrString, "enrtree://") {
// Use DNS Discovery
go func() {
w.dnsDiscover(ctx, addrString, fnApply)
identifyWg.Done()
}()
go w.dnsDiscover(ctx, addrString, fnApply)
} else {
// It is a normal multiaddress
addr, err := multiaddr.NewMultiaddr(addrString)
@ -448,30 +440,26 @@ func (w *Waku) addWakuV2Peers(ctx context.Context, cfg *Config) error {
continue
}
go func(ma multiaddr.Multiaddr) {
w.identifyAndConnect(ctx, cfg.LightClient, ma)
identifyWg.Done()
}(addr)
peerInfo, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
w.logger.Warn("invalid peer multiaddress", zap.Stringer("addr", addr), zap.Error(err))
continue
}
go w.identifyAndConnect(ctx, cfg.LightClient, *peerInfo)
}
}
identifyWg.Wait()
return nil
}
func (w *Waku) identifyAndConnect(ctx context.Context, isLightClient bool, ma multiaddr.Multiaddr) {
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
w.logger.Warn("invalid peer multiaddress", zap.String("addr", ma.String()), zap.Error(err))
return
}
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
func (w *Waku) identifyAndConnect(ctx context.Context, isLightClient bool, peerInfo peer.AddrInfo) {
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
defer cancel()
err = w.node.Host().Connect(ctx, *peerInfo)
err := w.node.Host().Connect(ctx, peerInfo)
if err != nil {
w.logger.Error("could not extract peerinfo", zap.String("ma", ma.String()), zap.Error(err))
w.logger.Error("could not connect to peer", zap.Any("peer", peerInfo), zap.Error(err))
return
}
@ -480,26 +468,29 @@ func (w *Waku) identifyAndConnect(ctx context.Context, isLightClient bool, ma mu
return // No connection
}
w.identifyService.IdentifyConn(conns[0])
if isLightClient {
err = w.node.Host().Network().ClosePeer(peerInfo.ID)
if err != nil {
w.logger.Error("could not close connections to peer", zap.Stringer("peer", peerInfo.ID), zap.Error(err))
select {
case <-w.ctx.Done():
return
case <-w.identifyService.IdentifyWait(conns[0]):
if isLightClient {
err = w.node.Host().Network().ClosePeer(peerInfo.ID)
if err != nil {
w.logger.Error("could not close connections to peer", zap.Stringer("peer", peerInfo.ID), zap.Error(err))
}
return
}
return
}
supportedProtocols, err := w.node.Host().Peerstore().SupportsProtocols(peerInfo.ID, relay.WakuRelayID_v200)
if err != nil {
w.logger.Error("could not obtain protocols", zap.Stringer("peer", peerInfo.ID), zap.Error(err))
return
}
if len(supportedProtocols) == 0 {
err = w.node.Host().Network().ClosePeer(peerInfo.ID)
supportedProtocols, err := w.node.Host().Peerstore().SupportsProtocols(peerInfo.ID, relay.WakuRelayID_v200)
if err != nil {
w.logger.Error("could not close connections to peer", zap.Stringer("peer", peerInfo.ID), zap.Error(err))
w.logger.Error("could not obtain protocols", zap.Stringer("peer", peerInfo.ID), zap.Error(err))
return
}
if len(supportedProtocols) == 0 {
err = w.node.Host().Network().ClosePeer(peerInfo.ID)
if err != nil {
w.logger.Error("could not close connections to peer", zap.Stringer("peer", peerInfo.ID), zap.Error(err))
}
}
}
}
@ -1566,12 +1557,11 @@ func (w *Waku) seedBootnodesForDiscV5() {
select {
case <-ticker.C:
if w.seededBootnodesForDiscV5 && len(w.node.Host().Network().Peers()) > 3 {
w.logger.Info("not querying bootnodes", zap.Bool("seeded", w.seededBootnodesForDiscV5), zap.Int("peer-count", len(w.node.Host().Network().Peers())))
w.logger.Debug("not querying bootnodes", zap.Bool("seeded", w.seededBootnodesForDiscV5), zap.Int("peer-count", len(w.node.Host().Network().Peers())))
continue
}
if canQuery() {
w.logger.Info("querying bootnodes", zap.Int("peer-count", len(w.node.Host().Network().Peers())))
w.logger.Info("querying bootnodes to restore connectivity")
w.logger.Info("querying bootnodes to restore connectivity", zap.Int("peer-count", len(w.node.Host().Network().Peers())))
err := w.restartDiscV5()
if err != nil {
w.logger.Warn("failed to restart discv5", zap.Error(err))
@ -1585,8 +1575,7 @@ func (w *Waku) seedBootnodesForDiscV5() {
}
} else {
w.logger.Info("can't query bootnodes", zap.Int("peer-count", len(w.node.Host().Network().Peers())))
w.logger.Info("can't query", zap.Int64("lastTry", lastTry), zap.Int64("now", now()), zap.Int64("backoff", bootnodesQueryBackoffMs*int64(math.Exp2(float64(retries)))), zap.Int("retries", retries))
w.logger.Info("can't query bootnodes", zap.Int("peer-count", len(w.node.Host().Network().Peers())), zap.Int64("lastTry", lastTry), zap.Int64("now", now()), zap.Int64("backoff", bootnodesQueryBackoffMs*int64(math.Exp2(float64(retries)))), zap.Int("retries", retries))
}
// If we go online, trigger immediately