From 6e2a0ffeb434406147e8a4c0adb028bef16fe2e2 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 28 Feb 2023 14:02:47 -0400 Subject: [PATCH] refactor(px): use request-response instead of dialing back --- waku/v2/protocol/peer_exchange/client.go | 112 +++++++++++++++ .../{waku_peer_exchange.go => protocol.go} | 130 ++---------------- 2 files changed, 122 insertions(+), 120 deletions(-) create mode 100644 waku/v2/protocol/peer_exchange/client.go rename waku/v2/protocol/peer_exchange/{waku_peer_exchange.go => protocol.go} (66%) diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go new file mode 100644 index 00000000..3c97e5f1 --- /dev/null +++ b/waku/v2/protocol/peer_exchange/client.go @@ -0,0 +1,112 @@ +package peer_exchange + +import ( + "bytes" + "context" + "math" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/rlp" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-msgio/pbio" + "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error { + params := new(PeerExchangeParameters) + params.host = wakuPX.h + params.log = wakuPX.log + + optList := DefaultOptions(wakuPX.h) + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) + } + + if params.selectedPeer == "" { + metrics.RecordPeerExchangeError(ctx, "dialError") + return ErrNoPeersAvailable + } + + requestRPC := &pb.PeerExchangeRPC{ + Query: &pb.PeerExchangeQuery{ + NumPeers: uint64(numPeers), + }, + } + + // We connect first so dns4 addresses are resolved (NewStream does not do it) + err := wakuPX.h.Connect(ctx, wakuPX.h.Peerstore().PeerInfo(params.selectedPeer)) + if err != nil { + return err + } + + connOpt, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) + if err != nil { + return err + } + defer connOpt.Close() + + writer := pbio.NewDelimitedWriter(connOpt) + err = writer.WriteMsg(requestRPC) + if err != nil { + return err + } + + reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) + responseRPC := &pb.PeerExchangeRPC{} + err = reader.ReadMsg(responseRPC) + if err != nil { + return err + } + + return wakuPX.handleResponse(ctx, responseRPC.Response) +} + +func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error { + var peers []peer.AddrInfo + for _, p := range response.PeerInfos { + enrRecord := &enr.Record{} + buf := bytes.NewBuffer(p.ENR) + + err := enrRecord.DecodeRLP(rlp.NewStream(buf, uint64(len(p.ENR)))) + if err != nil { + wakuPX.log.Error("converting bytes to enr", zap.Error(err)) + return err + } + + enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord) + if err != nil { + wakuPX.log.Error("creating enode record", zap.Error(err)) + + return err + } + + peerInfo, err := utils.EnodeToPeerInfo(enodeRecord) + if err != nil { + return err + } + + peers = append(peers, *peerInfo) + } + + if len(peers) != 0 { + wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(peers))) + wakuPX.wg.Add(1) + go func() { + defer wakuPX.wg.Done() + for _, p := range peers { + select { + case <-ctx.Done(): + return + case wakuPX.peerConnector.PeerChannel() <- p: + } + } + }() + } + + return nil +} diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange.go b/waku/v2/protocol/peer_exchange/protocol.go similarity index 66% rename from waku/v2/protocol/peer_exchange/waku_peer_exchange.go rename to waku/v2/protocol/peer_exchange/protocol.go index 3fd387c9..c78f07a9 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -11,10 +11,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/ethereum/go-ethereum/rlp" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -96,51 +93,6 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error { return nil } -func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error { - var peers []peer.AddrInfo - for _, p := range response.PeerInfos { - enrRecord := &enr.Record{} - buf := bytes.NewBuffer(p.ENR) - - err := enrRecord.DecodeRLP(rlp.NewStream(buf, uint64(len(p.ENR)))) - if err != nil { - wakuPX.log.Error("converting bytes to enr", zap.Error(err)) - return err - } - - enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord) - if err != nil { - wakuPX.log.Error("creating enode record", zap.Error(err)) - - return err - } - - peerInfo, err := utils.EnodeToPeerInfo(enodeRecord) - if err != nil { - return err - } - - peers = append(peers, *peerInfo) - } - - if len(peers) != 0 { - log.Info("connecting to newly discovered peers", zap.Int("count", len(peers))) - wakuPX.wg.Add(1) - go func() { - defer wakuPX.wg.Done() - for _, p := range peers { - select { - case <-ctx.Done(): - return - case wakuPX.peerConnector.PeerChannel() <- p: - } - } - }() - } - - return nil -} - func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.Stream) { return func(s network.Stream) { defer s.Close() @@ -156,19 +108,22 @@ func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.St if requestRPC.Query != nil { logger.Info("request received") - err := wakuPX.respond(ctx, requestRPC.Query.NumPeers, s.Conn().RemotePeer()) + + records, err := wakuPX.getENRsFromCache(requestRPC.Query.NumPeers) if err != nil { - logger.Error("responding", zap.Error(err)) + logger.Error("obtaining enrs from cache", zap.Error(err)) metrics.RecordPeerExchangeError(ctx, "pxFailure") return } - } - if requestRPC.Response != nil { - logger.Info("response received") - err := wakuPX.handleResponse(ctx, requestRPC.Response) + responseRPC := &pb.PeerExchangeRPC{} + responseRPC.Response = new(pb.PeerExchangeResponse) + responseRPC.Response.PeerInfos = records + + writer := pbio.NewDelimitedWriter(s) + err = writer.WriteMsg(responseRPC) if err != nil { - logger.Error("handling response", zap.Error(err)) + logger.Error("writing response", zap.Error(err)) metrics.RecordPeerExchangeError(ctx, "pxFailure") return } @@ -176,31 +131,6 @@ func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.St } } -func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error { - params := new(PeerExchangeParameters) - params.host = wakuPX.h - params.log = wakuPX.log - - optList := DefaultOptions(wakuPX.h) - optList = append(optList, opts...) - for _, opt := range optList { - opt(params) - } - - if params.selectedPeer == "" { - metrics.RecordPeerExchangeError(ctx, "dialError") - return ErrNoPeersAvailable - } - - requestRPC := &pb.PeerExchangeRPC{ - Query: &pb.PeerExchangeQuery{ - NumPeers: uint64(numPeers), - }, - } - - return wakuPX.sendPeerExchangeRPCToPeer(ctx, requestRPC, params.selectedPeer) -} - // Stop unmounts the peer exchange protocol func (wakuPX *WakuPeerExchange) Stop() { if wakuPX.cancel == nil { @@ -212,46 +142,6 @@ func (wakuPX *WakuPeerExchange) Stop() { wakuPX.wg.Wait() } -func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(ctx context.Context, rpc *pb.PeerExchangeRPC, peerID peer.ID) error { - logger := wakuPX.log.With(logging.HostID("peer", peerID)) - - // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := wakuPX.h.Connect(ctx, wakuPX.h.Peerstore().PeerInfo(peerID)) - if err != nil { - logger.Error("connecting peer", zap.Error(err)) - return err - } - - connOpt, err := wakuPX.h.NewStream(ctx, peerID, PeerExchangeID_v20alpha1) - if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) - return err - } - defer connOpt.Close() - - writer := pbio.NewDelimitedWriter(connOpt) - err = writer.WriteMsg(rpc) - if err != nil { - logger.Error("writing response", zap.Error(err)) - return err - } - - return nil -} - -func (wakuPX *WakuPeerExchange) respond(ctx context.Context, numPeers uint64, peerID peer.ID) error { - records, err := wakuPX.getENRsFromCache(numPeers) - if err != nil { - return err - } - - responseRPC := &pb.PeerExchangeRPC{} - responseRPC.Response = new(pb.PeerExchangeResponse) - responseRPC.Response.PeerInfos = records - - return wakuPX.sendPeerExchangeRPCToPeer(ctx, responseRPC, peerID) -} - func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) { wakuPX.enrCacheMutex.Lock() defer wakuPX.enrCacheMutex.Unlock()