diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 1b9a15a4d..6b90e85fe 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -35,7 +35,6 @@ import ( "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/multiformats/go-multiaddr" "google.golang.org/protobuf/proto" @@ -91,9 +90,8 @@ type ITelemetryClient interface { // Waku represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Waku struct { - node *node.WakuNode // reference to a libp2p waku node - identifyService identify.IDService - appDB *sql.DB + node *node.WakuNode // reference to a libp2p waku node + appDB *sql.DB dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map @@ -414,7 +412,7 @@ func (w *Waku) discoverAndConnectPeers() error { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { defer wg.Done() if len(d.PeerInfo.Addrs) != 0 { - go w.identifyAndConnect(w.ctx, w.cfg.LightClient, d.PeerInfo) + go w.connect(d.PeerInfo, wps.DNSDiscovery) } } @@ -441,53 +439,17 @@ func (w *Waku) discoverAndConnectPeers() error { continue } - go w.identifyAndConnect(w.ctx, w.cfg.LightClient, *peerInfo) + go w.connect(*peerInfo, wps.Static) } } return nil } -func (w *Waku) identifyAndConnect(ctx context.Context, isLightClient bool, peerInfo peer.AddrInfo) { - ctx, cancel := context.WithTimeout(ctx, 7*time.Second) - defer cancel() - - err := w.node.Host().Connect(ctx, peerInfo) - if err != nil { - w.logger.Error("could not connect to peer", zap.Any("peer", peerInfo), zap.Error(err)) - return - } - - conns := w.node.Host().Network().ConnsToPeer(peerInfo.ID) - if len(conns) == 0 { - return // No connection - } - - select { - case <-w.ctx.Done(): - return - case <-w.identifyService.IdentifyWait(conns[0]): - if isLightClient { - err = w.node.Host().Network().ClosePeer(peerInfo.ID) - if err != nil { - w.logger.Error("could not close connections to peer", zap.Stringer("peer", peerInfo.ID), zap.Error(err)) - } - return - } - - supportedProtocols, err := w.node.Host().Peerstore().SupportsProtocols(peerInfo.ID, relay.WakuRelayID_v200) - if err != nil { - w.logger.Error("could not obtain protocols", zap.Stringer("peer", peerInfo.ID), zap.Error(err)) - return - } - - if len(supportedProtocols) == 0 { - err = w.node.Host().Network().ClosePeer(peerInfo.ID) - if err != nil { - w.logger.Error("could not close connections to peer", zap.Stringer("peer", peerInfo.ID), zap.Error(err)) - } - } - } +func (w *Waku) connect(peerInfo peer.AddrInfo, origin wps.Origin) { + // Connection will be prunned eventually by the connection manager if needed + // The peer connector in go-waku uses Connect, so it will execute identify as part of its + w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, []string{w.cfg.DefaultShardPubsubTopic}, true) } func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { @@ -546,38 +508,29 @@ func (w *Waku) runPeerExchangeLoop() { case <-ticker.C: w.logger.Info("Running peer exchange loop") - availablePeers := w.node.Host().Peerstore().Peers() - peersWithRelay := 0 - for _, p := range availablePeers { - supportedProtocols, err := w.node.Host().Peerstore().SupportsProtocols(p, relay.WakuRelayID_v200) - if err != nil { - continue - } - if len(supportedProtocols) != 0 { - peersWithRelay++ - } - } - - peersToDiscover := w.cfg.DiscoveryLimit - peersWithRelay - if peersToDiscover <= 0 { - continue - } - // We select only the nodes discovered via DNS Discovery that support peer exchange + // We assume that those peers are running peer exchange according to infra config, + // If not, the peer selection process in go-waku will filter them out anyway w.dnsAddressCacheLock.RLock() + var peers []peer.ID for _, record := range w.dnsAddressCache { for _, discoveredNode := range record { if len(discoveredNode.PeerInfo.Addrs) == 0 { continue } - go w.identifyAndConnect(w.ctx, true, discoveredNode.PeerInfo) - + // Attempt to connect to the peers. + // Peers will be added to the libp2p peer store thanks to identify + go w.connect(discoveredNode.PeerInfo, wps.DNSDiscovery) + peers = append(peers, discoveredNode.PeerID) } } w.dnsAddressCacheLock.RUnlock() - err := w.node.PeerExchange().Request(w.ctx, peersToDiscover, peer_exchange.WithAutomaticPeerSelection()) - if err != nil { - w.logger.Error("couldnt request peers via peer exchange", zap.Error(err)) + + if len(peers) != 0 { + err := w.node.PeerExchange().Request(w.ctx, w.cfg.DiscoveryLimit, peer_exchange.WithAutomaticPeerSelection(peers...)) + if err != nil { + w.logger.Error("couldnt request peers via peer exchange", zap.Error(err)) + } } } } @@ -1169,14 +1122,6 @@ func (w *Waku) Start() error { w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID())) - idService, err := identify.NewIDService(w.node.Host()) - if err != nil { - return err - } - idService.Start() - - w.identifyService = idService - if err = w.discoverAndConnectPeers(); err != nil { return fmt.Errorf("failed to add wakuv2 peers: %v", err) } @@ -1317,15 +1262,10 @@ func (w *Waku) Stop() error { w.envelopeCache.Stop() - err := w.identifyService.Close() - if err != nil { - return err - } - w.node.Stop() if w.protectedTopicStore != nil { - err = w.protectedTopicStore.Close() + err := w.protectedTopicStore.Close() if err != nil { return err }