go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange.go

365 lines
9.0 KiB
Go
Raw Normal View History

2022-10-23 13:13:43 +00:00
package peer_exchange
import (
"bufio"
"bytes"
"context"
"errors"
"math"
"math/rand"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
2022-10-23 13:13:43 +00:00
"go.uber.org/zap"
)
// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1")
const MaxCacheSize = 1000
const CacheCleanWindow = 200
const dialTimeout = 7 * time.Second
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidId = errors.New("invalid request id")
)
type peerRecord struct {
node enode.Node
idx int
}
type WakuPeerExchange struct {
h host.Host
ctx context.Context
disc *discv5.DiscoveryV5
log *zap.Logger
quit chan struct{}
wg sync.WaitGroup
enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
enrCacheMutex sync.RWMutex
rng *rand.Rand
started bool
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(ctx context.Context, h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange {
wakuPX := new(WakuPeerExchange)
wakuPX.ctx = ctx
wakuPX.h = h
wakuPX.disc = disc
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = make(map[enode.ID]peerRecord)
wakuPX.rng = rand.New(rand.NewSource(rand.Int63()))
return wakuPX
}
// Start inits the peer exchange protocol
func (wakuPX *WakuPeerExchange) Start() error {
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest)
wakuPX.log.Info("Peer exchange protocol started")
wakuPX.started = true
wakuPX.quit = make(chan struct{}, 1)
wakuPX.wg.Add(1)
go wakuPX.runPeerExchangeDiscv5Loop()
return nil
}
func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse) error {
var peers []peer.AddrInfo
for _, p := range response.PeerInfos {
enrRecord := &enr.Record{}
buf := bytes.NewBuffer(p.ENR)
err := enrRecord.DecodeRLP(rlp.NewStream(buf, uint64(len(p.ENR))))
if err != nil {
wakuPX.log.Error("converting bytes to enr", zap.Error(err))
return err
}
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
if err != nil {
wakuPX.log.Error("creating enode record", zap.Error(err))
return err
}
peerInfo, err := utils.EnodeToPeerInfo(enodeRecord)
if err != nil {
return err
}
if wakuPX.h.Network().Connectedness(peerInfo.ID) != network.Connected {
peers = append(peers, *peerInfo)
}
}
if len(peers) != 0 {
log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
for _, p := range peers {
func(p peer.AddrInfo) {
ctx, cancel := context.WithTimeout(wakuPX.ctx, dialTimeout)
defer cancel()
err := wakuPX.h.Connect(ctx, p)
if err != nil {
log.Info("connecting to peer", zap.String("peer", p.ID.Pretty()), zap.Error(err))
}
}(p)
}
}
return nil
}
func (wakuPX *WakuPeerExchange) onRequest(s network.Stream) {
defer s.Close()
logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
requestRPC := &pb.PeerExchangeRPC{}
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(requestRPC)
if err != nil {
logger.Error("reading request", zap.Error(err))
metrics.RecordPeerExchangeError(wakuPX.ctx, "decodeRpcFailure")
return
}
if requestRPC.Query != nil {
logger.Info("request received")
err := wakuPX.respond(requestRPC.Query.NumPeers, s.Conn().RemotePeer())
if err != nil {
logger.Error("responding", zap.Error(err))
metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure")
return
}
}
if requestRPC.Response != nil {
logger.Info("response received")
err := wakuPX.handleResponse(requestRPC.Response)
if err != nil {
logger.Error("handling response", zap.Error(err))
metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure")
return
}
}
}
func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error {
params := new(PeerExchangeParameters)
params.host = wakuPX.h
params.log = wakuPX.log
optList := DefaultOptions(wakuPX.h)
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if params.selectedPeer == "" {
metrics.RecordPeerExchangeError(wakuPX.ctx, "dialError")
return ErrNoPeersAvailable
}
requestRPC := &pb.PeerExchangeRPC{
Query: &pb.PeerExchangeQuery{
NumPeers: uint64(numPeers),
},
}
return wakuPX.sendPeerExchangeRPCToPeer(requestRPC, params.selectedPeer)
}
// IsStarted returns if the peer exchange protocol has been mounted or not
func (wakuPX *WakuPeerExchange) IsStarted() bool {
return wakuPX.started
}
// Stop unmounts the peer exchange protocol
func (wakuPX *WakuPeerExchange) Stop() {
if wakuPX.started {
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
wakuPX.started = false
close(wakuPX.quit)
wakuPX.wg.Wait()
}
}
func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(rpc *pb.PeerExchangeRPC, peerID peer.ID) error {
logger := wakuPX.log.With(logging.HostID("peer", peerID))
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wakuPX.h.Connect(wakuPX.ctx, wakuPX.h.Peerstore().PeerInfo(peerID))
if err != nil {
logger.Error("connecting peer", zap.Error(err))
return err
}
connOpt, err := wakuPX.h.NewStream(wakuPX.ctx, peerID, PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
return err
}
defer connOpt.Close()
writer := protoio.NewDelimitedWriter(connOpt)
err = writer.WriteMsg(rpc)
if err != nil {
logger.Error("writing response", zap.Error(err))
return err
}
return nil
}
func (wakuPX *WakuPeerExchange) respond(numPeers uint64, peerID peer.ID) error {
records, err := wakuPX.getENRsFromCache(numPeers)
if err != nil {
return err
}
responseRPC := &pb.PeerExchangeRPC{}
responseRPC.Response = new(pb.PeerExchangeResponse)
responseRPC.Response.PeerInfos = records
return wakuPX.sendPeerExchangeRPCToPeer(responseRPC, peerID)
}
func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) {
wakuPX.enrCacheMutex.Lock()
defer wakuPX.enrCacheMutex.Unlock()
if len(wakuPX.enrCache) == 0 {
return nil, nil
}
numItems := int(numPeers)
if len(wakuPX.enrCache) < int(numPeers) {
numItems = len(wakuPX.enrCache)
}
perm := wakuPX.rng.Perm(len(wakuPX.enrCache))[0:numItems]
permSet := make(map[int]int)
for i, v := range perm {
permSet[v] = i
}
var result []*pb.PeerInfo
iter := 0
for k := range wakuPX.enrCache {
if _, ok := permSet[iter]; ok {
var b bytes.Buffer
writer := bufio.NewWriter(&b)
enode := wakuPX.enrCache[k]
err := enode.node.Record().EncodeRLP(writer)
if err != nil {
return nil, err
}
writer.Flush()
result = append(result, &pb.PeerInfo{
ENR: b.Bytes(),
})
}
iter++
}
return result, nil
}
func (wakuPX *WakuPeerExchange) cleanCache() {
if len(wakuPX.enrCache) < MaxCacheSize {
return
}
r := make(map[enode.ID]peerRecord)
for k, v := range wakuPX.enrCache {
if v.idx > CacheCleanWindow {
v.idx -= CacheCleanWindow
r[k] = v
}
}
wakuPX.enrCache = r
}
func (wakuPX *WakuPeerExchange) findPeers() {
if !wakuPX.disc.IsStarted() {
return
}
ctx, cancel := context.WithTimeout(wakuPX.ctx, 2*time.Second)
defer cancel()
peerRecords, err := wakuPX.disc.FindNodes(ctx, "")
if err != nil {
wakuPX.log.Error("finding peers", zap.Error(err))
}
cnt := 0
wakuPX.enrCacheMutex.Lock()
for _, p := range peerRecords {
cnt++
wakuPX.enrCache[p.Node.ID()] = peerRecord{
idx: len(wakuPX.enrCache),
node: p.Node,
}
}
wakuPX.enrCacheMutex.Unlock()
wakuPX.log.Info("discovered px peers via discv5", zap.Int("count", cnt))
wakuPX.cleanCache()
}
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop() {
defer wakuPX.wg.Done()
// Runs a discv5 loop adding new peers to the px peer cache
if wakuPX.disc == nil {
wakuPX.log.Warn("trying to run discovery v5 (for PX) while it's disabled")
return
}
wakuPX.log.Info("starting peer exchange discovery v5 loop")
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
// This loop "competes" with the loop in wakunode2
// For the purpose of collecting px peers, 30 sec intervals should be enough
wakuPX.findPeers()
for {
select {
case <-wakuPX.quit:
return
case <-ticker.C:
wakuPX.findPeers()
}
}
}