go-waku/waku/v2/protocol/peer_exchange/protocol.go

215 lines
5.8 KiB
Go
Raw Normal View History

2022-10-23 13:13:43 +00:00
package peer_exchange
import (
"context"
"errors"
"fmt"
2022-10-23 13:13:43 +00:00
"math"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
2023-04-20 18:51:13 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"github.com/waku-org/go-waku/waku/v2/service"
2022-10-23 13:13:43 +00:00
"go.uber.org/zap"
"golang.org/x/time/rate"
2022-10-23 13:13:43 +00:00
)
// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1")
const MaxCacheSize = 1000
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidID = errors.New("invalid request id")
2022-10-23 13:13:43 +00:00
)
2023-07-31 18:58:50 +00:00
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
type PeerConnector interface {
Subscribe(context.Context, <-chan service.PeerData)
}
2022-10-23 13:13:43 +00:00
type WakuPeerExchange struct {
h host.Host
disc *discv5.DiscoveryV5
pm *peermanager.PeerManager
metrics Metrics
log *zap.Logger
*service.CommonService
peerConnector PeerConnector
enrCache *enrCache
limiter *rate.Limiter
2022-10-23 13:13:43 +00:00
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
// Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) (*WakuPeerExchange, error) {
2022-10-23 13:13:43 +00:00
wakuPX := new(WakuPeerExchange)
wakuPX.disc = disc
wakuPX.metrics = newMetrics(reg)
2022-10-23 13:13:43 +00:00
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = newEnrCache(MaxCacheSize)
wakuPX.peerConnector = peerConnector
wakuPX.pm = pm
wakuPX.CommonService = service.NewCommonService()
params := &PeerExchangeParameters{}
for _, opt := range opts {
opt(params)
}
wakuPX.limiter = params.limiter
return wakuPX, nil
2022-10-23 13:13:43 +00:00
}
// SetHost sets the host to be able to mount or consume a protocol
2023-04-17 00:04:12 +00:00
func (wakuPX *WakuPeerExchange) SetHost(h host.Host) {
wakuPX.h = h
}
2022-10-23 13:13:43 +00:00
// Start inits the peer exchange protocol
func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
return wakuPX.CommonService.Start(ctx, wakuPX.start)
}
func (wakuPX *WakuPeerExchange) start() error {
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest())
2022-10-23 13:13:43 +00:00
wakuPX.WaitGroup().Add(1)
go wakuPX.runPeerExchangeDiscv5Loop(wakuPX.Context())
wakuPX.log.Info("Peer exchange protocol started")
2022-10-23 13:13:43 +00:00
return nil
}
2023-10-22 01:34:52 +00:00
func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
return func(stream network.Stream) {
logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
if wakuPX.limiter != nil && !wakuPX.limiter.Allow() {
wakuPX.metrics.RecordError(rateLimitFailure)
wakuPX.log.Error("exceeds the rate limit")
// TODO: peer exchange protocol should contain an err field
return
}
requestRPC := &pb.PeerExchangeRPC{}
2023-10-22 01:34:52 +00:00
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
err := reader.ReadMsg(requestRPC)
2022-10-23 13:13:43 +00:00
if err != nil {
logger.Error("reading request", zap.Error(err))
wakuPX.metrics.RecordError(decodeRPCFailure)
2023-10-22 01:34:52 +00:00
if err := stream.Reset(); err != nil {
wakuPX.log.Error("resetting connection", zap.Error(err))
}
2022-10-23 13:13:43 +00:00
return
}
if requestRPC.Query != nil {
logger.Info("request received")
records, err := wakuPX.enrCache.getENRs(int(requestRPC.Query.NumPeers), nil)
if err != nil {
logger.Error("obtaining enrs from cache", zap.Error(err))
wakuPX.metrics.RecordError(pxFailure)
return
}
responseRPC := &pb.PeerExchangeRPC{}
responseRPC.Response = new(pb.PeerExchangeResponse)
responseRPC.Response.PeerInfos = records
2023-10-22 01:34:52 +00:00
writer := pbio.NewDelimitedWriter(stream)
err = writer.WriteMsg(responseRPC)
if err != nil {
logger.Error("writing response", zap.Error(err))
wakuPX.metrics.RecordError(pxFailure)
2023-10-22 01:34:52 +00:00
if err := stream.Reset(); err != nil {
wakuPX.log.Error("resetting connection", zap.Error(err))
}
return
}
2022-10-23 13:13:43 +00:00
}
2023-10-22 01:34:52 +00:00
stream.Close()
2022-10-23 13:13:43 +00:00
}
}
// Stop unmounts the peer exchange protocol
func (wakuPX *WakuPeerExchange) Stop() {
wakuPX.CommonService.Stop(func() {
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
})
2022-10-23 13:13:43 +00:00
}
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error {
2023-07-27 18:14:14 +00:00
iterator, err := wakuPX.disc.PeerIterator()
2022-10-23 13:13:43 +00:00
if err != nil {
return fmt.Errorf("obtaining iterator: %w", err)
2022-10-23 13:13:43 +00:00
}
// Closing iterator
defer iterator.Close()
2023-01-31 14:45:19 +00:00
peerCnt := 0
for discv5.DelayedHasNext(ctx, iterator, &peerCnt) {
2023-04-20 18:51:13 +00:00
_, addresses, err := enr.Multiaddress(iterator.Node())
if err != nil {
wakuPX.log.Error("extracting multiaddrs from enr", zap.Error(err))
continue
}
if len(addresses) == 0 {
continue
}
err = wakuPX.enrCache.updateCache(iterator.Node())
if err != nil {
wakuPX.log.Error("adding peer to cache", zap.Error(err))
continue
}
select {
case <-ctx.Done():
return nil
default:
}
}
return nil
2022-10-23 13:13:43 +00:00
}
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
defer wakuPX.WaitGroup().Done()
2022-10-23 13:13:43 +00:00
// Runs a discv5 loop adding new peers to the px peer cache
if wakuPX.disc == nil {
wakuPX.log.Warn("trying to run discovery v5 (for PX) while it's disabled")
return
}
for {
err := wakuPX.iterate(ctx)
if err != nil {
wakuPX.log.Debug("iterating peer exchange", zap.Error(err))
}
t := time.NewTimer(5 * time.Second)
2022-10-23 13:13:43 +00:00
select {
case <-t.C:
t.Stop()
case <-ctx.Done():
t.Stop()
return
2022-10-23 13:13:43 +00:00
}
}
}