mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-04 23:13:09 +00:00
refactor(px): use request-response instead of dialing back
This commit is contained in:
parent
cedaa670c7
commit
6e2a0ffeb4
112
waku/v2/protocol/peer_exchange/client.go
Normal file
112
waku/v2/protocol/peer_exchange/client.go
Normal file
@ -0,0 +1,112 @@
|
||||
package peer_exchange
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/waku-org/go-waku/waku/v2/metrics"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error {
|
||||
params := new(PeerExchangeParameters)
|
||||
params.host = wakuPX.h
|
||||
params.log = wakuPX.log
|
||||
|
||||
optList := DefaultOptions(wakuPX.h)
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
if params.selectedPeer == "" {
|
||||
metrics.RecordPeerExchangeError(ctx, "dialError")
|
||||
return ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
requestRPC := &pb.PeerExchangeRPC{
|
||||
Query: &pb.PeerExchangeQuery{
|
||||
NumPeers: uint64(numPeers),
|
||||
},
|
||||
}
|
||||
|
||||
// We connect first so dns4 addresses are resolved (NewStream does not do it)
|
||||
err := wakuPX.h.Connect(ctx, wakuPX.h.Peerstore().PeerInfo(params.selectedPeer))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
connOpt, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer connOpt.Close()
|
||||
|
||||
writer := pbio.NewDelimitedWriter(connOpt)
|
||||
err = writer.WriteMsg(requestRPC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32)
|
||||
responseRPC := &pb.PeerExchangeRPC{}
|
||||
err = reader.ReadMsg(responseRPC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return wakuPX.handleResponse(ctx, responseRPC.Response)
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error {
|
||||
var peers []peer.AddrInfo
|
||||
for _, p := range response.PeerInfos {
|
||||
enrRecord := &enr.Record{}
|
||||
buf := bytes.NewBuffer(p.ENR)
|
||||
|
||||
err := enrRecord.DecodeRLP(rlp.NewStream(buf, uint64(len(p.ENR))))
|
||||
if err != nil {
|
||||
wakuPX.log.Error("converting bytes to enr", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
|
||||
if err != nil {
|
||||
wakuPX.log.Error("creating enode record", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
peerInfo, err := utils.EnodeToPeerInfo(enodeRecord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
peers = append(peers, *peerInfo)
|
||||
}
|
||||
|
||||
if len(peers) != 0 {
|
||||
wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
|
||||
wakuPX.wg.Add(1)
|
||||
go func() {
|
||||
defer wakuPX.wg.Done()
|
||||
for _, p := range peers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case wakuPX.peerConnector.PeerChannel() <- p:
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -11,10 +11,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
@ -96,51 +93,6 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error {
|
||||
var peers []peer.AddrInfo
|
||||
for _, p := range response.PeerInfos {
|
||||
enrRecord := &enr.Record{}
|
||||
buf := bytes.NewBuffer(p.ENR)
|
||||
|
||||
err := enrRecord.DecodeRLP(rlp.NewStream(buf, uint64(len(p.ENR))))
|
||||
if err != nil {
|
||||
wakuPX.log.Error("converting bytes to enr", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
|
||||
if err != nil {
|
||||
wakuPX.log.Error("creating enode record", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
peerInfo, err := utils.EnodeToPeerInfo(enodeRecord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
peers = append(peers, *peerInfo)
|
||||
}
|
||||
|
||||
if len(peers) != 0 {
|
||||
log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
|
||||
wakuPX.wg.Add(1)
|
||||
go func() {
|
||||
defer wakuPX.wg.Done()
|
||||
for _, p := range peers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case wakuPX.peerConnector.PeerChannel() <- p:
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.Stream) {
|
||||
return func(s network.Stream) {
|
||||
defer s.Close()
|
||||
@ -156,19 +108,22 @@ func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.St
|
||||
|
||||
if requestRPC.Query != nil {
|
||||
logger.Info("request received")
|
||||
err := wakuPX.respond(ctx, requestRPC.Query.NumPeers, s.Conn().RemotePeer())
|
||||
|
||||
records, err := wakuPX.getENRsFromCache(requestRPC.Query.NumPeers)
|
||||
if err != nil {
|
||||
logger.Error("responding", zap.Error(err))
|
||||
logger.Error("obtaining enrs from cache", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(ctx, "pxFailure")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if requestRPC.Response != nil {
|
||||
logger.Info("response received")
|
||||
err := wakuPX.handleResponse(ctx, requestRPC.Response)
|
||||
responseRPC := &pb.PeerExchangeRPC{}
|
||||
responseRPC.Response = new(pb.PeerExchangeResponse)
|
||||
responseRPC.Response.PeerInfos = records
|
||||
|
||||
writer := pbio.NewDelimitedWriter(s)
|
||||
err = writer.WriteMsg(responseRPC)
|
||||
if err != nil {
|
||||
logger.Error("handling response", zap.Error(err))
|
||||
logger.Error("writing response", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(ctx, "pxFailure")
|
||||
return
|
||||
}
|
||||
@ -176,31 +131,6 @@ func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.St
|
||||
}
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error {
|
||||
params := new(PeerExchangeParameters)
|
||||
params.host = wakuPX.h
|
||||
params.log = wakuPX.log
|
||||
|
||||
optList := DefaultOptions(wakuPX.h)
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
if params.selectedPeer == "" {
|
||||
metrics.RecordPeerExchangeError(ctx, "dialError")
|
||||
return ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
requestRPC := &pb.PeerExchangeRPC{
|
||||
Query: &pb.PeerExchangeQuery{
|
||||
NumPeers: uint64(numPeers),
|
||||
},
|
||||
}
|
||||
|
||||
return wakuPX.sendPeerExchangeRPCToPeer(ctx, requestRPC, params.selectedPeer)
|
||||
}
|
||||
|
||||
// Stop unmounts the peer exchange protocol
|
||||
func (wakuPX *WakuPeerExchange) Stop() {
|
||||
if wakuPX.cancel == nil {
|
||||
@ -212,46 +142,6 @@ func (wakuPX *WakuPeerExchange) Stop() {
|
||||
wakuPX.wg.Wait()
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(ctx context.Context, rpc *pb.PeerExchangeRPC, peerID peer.ID) error {
|
||||
logger := wakuPX.log.With(logging.HostID("peer", peerID))
|
||||
|
||||
// We connect first so dns4 addresses are resolved (NewStream does not do it)
|
||||
err := wakuPX.h.Connect(ctx, wakuPX.h.Peerstore().PeerInfo(peerID))
|
||||
if err != nil {
|
||||
logger.Error("connecting peer", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
connOpt, err := wakuPX.h.NewStream(ctx, peerID, PeerExchangeID_v20alpha1)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer connOpt.Close()
|
||||
|
||||
writer := pbio.NewDelimitedWriter(connOpt)
|
||||
err = writer.WriteMsg(rpc)
|
||||
if err != nil {
|
||||
logger.Error("writing response", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) respond(ctx context.Context, numPeers uint64, peerID peer.ID) error {
|
||||
records, err := wakuPX.getENRsFromCache(numPeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
responseRPC := &pb.PeerExchangeRPC{}
|
||||
responseRPC.Response = new(pb.PeerExchangeResponse)
|
||||
responseRPC.Response.PeerInfos = records
|
||||
|
||||
return wakuPX.sendPeerExchangeRPCToPeer(ctx, responseRPC, peerID)
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) {
|
||||
wakuPX.enrCacheMutex.Lock()
|
||||
defer wakuPX.enrCacheMutex.Unlock()
|
||||
Loading…
x
Reference in New Issue
Block a user