mirror of https://github.com/status-im/go-waku.git
feat: peer exchange
This commit is contained in:
parent
861f37f15a
commit
8b64934e2c
12
waku.go
12
waku.go
|
@ -332,6 +332,18 @@ func main() {
|
|||
Usage: "Discovery can automatically update its ENR with the IP address as seen by other nodes it communicates with.",
|
||||
Destination: &options.DiscV5.AutoUpdate,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "peer-exchange",
|
||||
Usage: "Enable waku peer exchange protocol (responder side)",
|
||||
Destination: &options.PeerExchange.Enable,
|
||||
},
|
||||
&cli.GenericFlag{
|
||||
Name: "peer-exchange-node",
|
||||
Usage: "Peer multiaddr to send peer exchange requests to. (enables peer exchange protocol requester side)",
|
||||
Value: &cliutils.MultiaddrValue{
|
||||
Value: &options.PeerExchange.Node,
|
||||
},
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "dns-discovery",
|
||||
Usage: "Enable DNS discovery",
|
||||
|
|
20
waku/node.go
20
waku/node.go
|
@ -40,6 +40,7 @@ import (
|
|||
"github.com/status-im/go-waku/waku/v2/node"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/filter"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/peer_exchange"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/store"
|
||||
"github.com/status-im/go-waku/waku/v2/rest"
|
||||
|
@ -254,6 +255,10 @@ func Execute(options Options) {
|
|||
nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second))))
|
||||
}
|
||||
|
||||
if options.PeerExchange.Enable {
|
||||
nodeOpts = append(nodeOpts, node.WithPeerExchange())
|
||||
}
|
||||
|
||||
checkForRLN(logger, options, &nodeOpts)
|
||||
|
||||
wakuNode, err := node.New(ctx, nodeOpts...)
|
||||
|
@ -272,6 +277,21 @@ func Execute(options Options) {
|
|||
if err = wakuNode.DiscV5().Start(); err != nil {
|
||||
logger.Fatal("starting discovery v5", zap.Error(err))
|
||||
}
|
||||
|
||||
// retrieve and connect to peer exchange peers
|
||||
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
|
||||
logger.Info("retrieving peer info via peer exchange protocol")
|
||||
|
||||
peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, string(peer_exchange.PeerExchangeID_v20alpha1))
|
||||
if err != nil {
|
||||
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
|
||||
} else {
|
||||
desiredOutDegree := 6 // TODO: obtain this from gossipsub D
|
||||
if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(*peerId)); err != nil {
|
||||
logger.Error("requesting peers via peer exchange", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(options.Relay.Topics.Value()) == 0 {
|
||||
|
|
|
@ -131,6 +131,12 @@ type WSOptions struct {
|
|||
CertPath string
|
||||
}
|
||||
|
||||
// PeerExchangeOptions are settings used with the peer exchange protocol
|
||||
type PeerExchangeOptions struct {
|
||||
Enable bool
|
||||
Node *multiaddr.Multiaddr
|
||||
}
|
||||
|
||||
// Options contains all the available features and settings that can be
|
||||
// configured via flags when executing go-waku as a service.
|
||||
type Options struct {
|
||||
|
@ -156,6 +162,7 @@ type Options struct {
|
|||
PersistPeers bool
|
||||
UserAgent string
|
||||
|
||||
PeerExchange PeerExchangeOptions
|
||||
Websocket WSOptions
|
||||
Relay RelayOptions
|
||||
Store StoreOptions
|
||||
|
|
|
@ -21,7 +21,7 @@ import (
|
|||
)
|
||||
|
||||
type DiscoveryV5 struct {
|
||||
sync.Mutex
|
||||
sync.RWMutex
|
||||
|
||||
discovery.Discovery
|
||||
|
||||
|
@ -33,6 +33,7 @@ type DiscoveryV5 struct {
|
|||
localnode *enode.LocalNode
|
||||
NAT nat.Interface
|
||||
quit chan struct{}
|
||||
started bool
|
||||
|
||||
log *zap.Logger
|
||||
|
||||
|
@ -43,13 +44,14 @@ type DiscoveryV5 struct {
|
|||
|
||||
type peerCache struct {
|
||||
sync.RWMutex
|
||||
recs map[peer.ID]peerRecord
|
||||
recs map[peer.ID]PeerRecord
|
||||
rng *rand.Rand
|
||||
}
|
||||
|
||||
type peerRecord struct {
|
||||
type PeerRecord struct {
|
||||
expire int64
|
||||
peer peer.AddrInfo
|
||||
Peer peer.AddrInfo
|
||||
Node enode.Node
|
||||
}
|
||||
|
||||
type discV5Parameters struct {
|
||||
|
@ -115,7 +117,7 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc
|
|||
wg: &sync.WaitGroup{},
|
||||
peerCache: peerCache{
|
||||
rng: rand.New(rand.NewSource(rand.Int63())),
|
||||
recs: make(map[peer.ID]peerRecord),
|
||||
recs: make(map[peer.ID]PeerRecord),
|
||||
},
|
||||
localnode: localnode,
|
||||
config: discover.Config{
|
||||
|
@ -137,6 +139,10 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) Node() *enode.Node {
|
||||
return d.localnode.Node()
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) listen() error {
|
||||
conn, err := net.ListenUDP("udp", d.udpAddr)
|
||||
if err != nil {
|
||||
|
@ -174,9 +180,14 @@ func (d *DiscoveryV5) Start() error {
|
|||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
if d.started {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.wg.Wait() // Waiting for other go routines to stop
|
||||
|
||||
d.quit = make(chan struct{}, 1)
|
||||
d.started = true
|
||||
|
||||
err := d.listen()
|
||||
if err != nil {
|
||||
|
@ -190,10 +201,15 @@ func (d *DiscoveryV5) Stop() {
|
|||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
if !d.started {
|
||||
return
|
||||
}
|
||||
|
||||
close(d.quit)
|
||||
|
||||
d.listener.Close()
|
||||
d.listener = nil
|
||||
d.started = false
|
||||
|
||||
d.log.Info("stopped Discovery V5")
|
||||
|
||||
|
@ -293,9 +309,10 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi
|
|||
}
|
||||
|
||||
for _, p := range peerAddrs {
|
||||
d.peerCache.recs[p.ID] = peerRecord{
|
||||
d.peerCache.recs[p.ID] = PeerRecord{
|
||||
expire: time.Now().Unix() + 3600, // Expires in 1hr
|
||||
peer: p,
|
||||
Peer: p,
|
||||
Node: *iterator.Node(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -320,7 +337,7 @@ func (d *DiscoveryV5) removeExpiredPeers() int {
|
|||
return newCacheSize
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
|
||||
func (d *DiscoveryV5) FindNodes(ctx context.Context, topic string, opts ...discovery.Option) ([]PeerRecord, error) {
|
||||
// Get options
|
||||
var options discovery.Options
|
||||
err := options.Apply(opts...)
|
||||
|
@ -328,7 +345,7 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco
|
|||
return nil, err
|
||||
}
|
||||
|
||||
const maxLimit = 100
|
||||
const maxLimit = 600
|
||||
limit := options.Limit
|
||||
if limit == 0 || limit > maxLimit {
|
||||
limit = maxLimit
|
||||
|
@ -368,29 +385,43 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco
|
|||
count = limit
|
||||
}
|
||||
|
||||
chPeer := make(chan peer.AddrInfo, count)
|
||||
|
||||
perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count]
|
||||
permSet := make(map[int]int)
|
||||
for i, v := range perm {
|
||||
permSet[v] = i
|
||||
}
|
||||
|
||||
sendLst := make([]*peer.AddrInfo, count)
|
||||
sendLst := make([]PeerRecord, count)
|
||||
iter := 0
|
||||
for k := range d.peerCache.recs {
|
||||
if sendIndex, ok := permSet[iter]; ok {
|
||||
peerInfo := d.peerCache.recs[k].peer
|
||||
sendLst[sendIndex] = &peerInfo
|
||||
sendLst[sendIndex] = d.peerCache.recs[k]
|
||||
}
|
||||
iter++
|
||||
}
|
||||
|
||||
for _, send := range sendLst {
|
||||
chPeer <- *send
|
||||
return sendLst, err
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
|
||||
records, err := d.FindNodes(ctx, topic, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
chPeer := make(chan peer.AddrInfo, len(records))
|
||||
for _, r := range records {
|
||||
chPeer <- r.Peer
|
||||
}
|
||||
|
||||
close(chPeer)
|
||||
|
||||
return chPeer, err
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) IsStarted() bool {
|
||||
d.RLock()
|
||||
defer d.RUnlock()
|
||||
|
||||
return d.started
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ var (
|
|||
FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless)
|
||||
StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless)
|
||||
LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless)
|
||||
PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless)
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -79,6 +80,12 @@ func RecordLightpushError(ctx context.Context, tagType string) {
|
|||
}
|
||||
}
|
||||
|
||||
func RecordPeerExchangeError(ctx context.Context, tagType string) {
|
||||
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(ErrorType, tagType)}, PeerExchangeError.M(1)); err != nil {
|
||||
utils.Logger().Error("failed to record with tags", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func RecordMessage(ctx context.Context, tagType string, len int) {
|
||||
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoreMessages.M(int64(len))); err != nil {
|
||||
utils.Logger().Error("failed to record with tags", zap.Error(err))
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/status-im/go-waku/waku/v2/protocol/filter"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/peer_exchange"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/store"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/swap"
|
||||
|
@ -78,7 +79,8 @@ type WakuNode struct {
|
|||
|
||||
addrChan chan ma.Multiaddr
|
||||
|
||||
discoveryV5 *discv5.DiscoveryV5
|
||||
discoveryV5 *discv5.DiscoveryV5
|
||||
peerExchange *peer_exchange.WakuPeerExchange
|
||||
|
||||
bcaster v2.Broadcaster
|
||||
|
||||
|
@ -291,6 +293,13 @@ func (w *WakuNode) Start() error {
|
|||
}
|
||||
}
|
||||
|
||||
if w.opts.enablePeerExchange {
|
||||
err := w.mountPeerExchange()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if w.opts.enableDiscV5 {
|
||||
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...))
|
||||
}
|
||||
|
@ -345,6 +354,14 @@ func (w *WakuNode) Stop() {
|
|||
w.filter.Stop()
|
||||
}
|
||||
|
||||
if w.peerExchange != nil {
|
||||
w.peerExchange.Stop()
|
||||
}
|
||||
|
||||
if w.discoveryV5 != nil {
|
||||
w.discoveryV5.Stop()
|
||||
}
|
||||
|
||||
w.relay.Stop()
|
||||
w.lightPush.Stop()
|
||||
w.store.Stop()
|
||||
|
@ -405,6 +422,11 @@ func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 {
|
|||
return w.discoveryV5
|
||||
}
|
||||
|
||||
// PeerExchange is used to access any operation related to Peer Exchange
|
||||
func (w *WakuNode) PeerExchange() *peer_exchange.WakuPeerExchange {
|
||||
return w.peerExchange
|
||||
}
|
||||
|
||||
// Broadcaster is used to access the message broadcaster that is used to push
|
||||
// messages to different protocols
|
||||
func (w *WakuNode) Broadcaster() v2.Broadcaster {
|
||||
|
@ -474,6 +496,11 @@ func (w *WakuNode) mountDiscV5() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (w *WakuNode) mountPeerExchange() error {
|
||||
w.peerExchange = peer_exchange.NewWakuPeerExchange(w.ctx, w.host, w.discoveryV5, w.log)
|
||||
return w.peerExchange.Start()
|
||||
}
|
||||
|
||||
func (w *WakuNode) startStore() {
|
||||
w.store.Start(w.ctx)
|
||||
|
||||
|
|
|
@ -77,6 +77,8 @@ type WakuNodeParameters struct {
|
|||
discV5Opts []pubsub.DiscoverOpt
|
||||
discV5autoUpdate bool
|
||||
|
||||
enablePeerExchange bool
|
||||
|
||||
enableRLN bool
|
||||
rlnRelayMemIndex uint
|
||||
rlnRelayPubsubTopic string
|
||||
|
@ -277,6 +279,14 @@ func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, disc
|
|||
}
|
||||
}
|
||||
|
||||
// WithPeerExchange is a WakuOption used to enable Peer Exchange
|
||||
func WithPeerExchange() WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.enablePeerExchange = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
|
||||
// accepts a list of WakuFilter gossipsub options to setup the protocol
|
||||
func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption {
|
||||
|
|
|
@ -5,3 +5,4 @@ package pb
|
|||
//go:generate protoc -I. --gofast_out=. ./waku_message.proto
|
||||
//go:generate protoc -I. --gofast_out=. ./waku_store.proto
|
||||
//go:generate protoc -I. --gofast_out=. ./waku_swap.proto
|
||||
//go:generate protoc -I. --gofast_out=. ./waku_peer_exchange.proto
|
||||
|
|
|
@ -0,0 +1,938 @@
|
|||
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||
// source: waku_peer_exchange.proto
|
||||
|
||||
package pb
|
||||
|
||||
import (
|
||||
fmt "fmt"
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
io "io"
|
||||
math "math"
|
||||
math_bits "math/bits"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
type PeerInfo struct {
|
||||
ENR []byte `protobuf:"bytes,1,opt,name=ENR,proto3" json:"ENR,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *PeerInfo) Reset() { *m = PeerInfo{} }
|
||||
func (m *PeerInfo) String() string { return proto.CompactTextString(m) }
|
||||
func (*PeerInfo) ProtoMessage() {}
|
||||
func (*PeerInfo) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_ce50192ba54b780f, []int{0}
|
||||
}
|
||||
func (m *PeerInfo) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *PeerInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_PeerInfo.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *PeerInfo) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_PeerInfo.Merge(m, src)
|
||||
}
|
||||
func (m *PeerInfo) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *PeerInfo) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_PeerInfo.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_PeerInfo proto.InternalMessageInfo
|
||||
|
||||
func (m *PeerInfo) GetENR() []byte {
|
||||
if m != nil {
|
||||
return m.ENR
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type PeerExchangeQuery struct {
|
||||
NumPeers uint64 `protobuf:"varint,1,opt,name=numPeers,proto3" json:"numPeers,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *PeerExchangeQuery) Reset() { *m = PeerExchangeQuery{} }
|
||||
func (m *PeerExchangeQuery) String() string { return proto.CompactTextString(m) }
|
||||
func (*PeerExchangeQuery) ProtoMessage() {}
|
||||
func (*PeerExchangeQuery) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_ce50192ba54b780f, []int{1}
|
||||
}
|
||||
func (m *PeerExchangeQuery) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *PeerExchangeQuery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_PeerExchangeQuery.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *PeerExchangeQuery) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_PeerExchangeQuery.Merge(m, src)
|
||||
}
|
||||
func (m *PeerExchangeQuery) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *PeerExchangeQuery) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_PeerExchangeQuery.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_PeerExchangeQuery proto.InternalMessageInfo
|
||||
|
||||
func (m *PeerExchangeQuery) GetNumPeers() uint64 {
|
||||
if m != nil {
|
||||
return m.NumPeers
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type PeerExchangeResponse struct {
|
||||
PeerInfos []*PeerInfo `protobuf:"bytes,1,rep,name=peerInfos,proto3" json:"peerInfos,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *PeerExchangeResponse) Reset() { *m = PeerExchangeResponse{} }
|
||||
func (m *PeerExchangeResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*PeerExchangeResponse) ProtoMessage() {}
|
||||
func (*PeerExchangeResponse) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_ce50192ba54b780f, []int{2}
|
||||
}
|
||||
func (m *PeerExchangeResponse) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *PeerExchangeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_PeerExchangeResponse.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *PeerExchangeResponse) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_PeerExchangeResponse.Merge(m, src)
|
||||
}
|
||||
func (m *PeerExchangeResponse) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *PeerExchangeResponse) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_PeerExchangeResponse.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_PeerExchangeResponse proto.InternalMessageInfo
|
||||
|
||||
func (m *PeerExchangeResponse) GetPeerInfos() []*PeerInfo {
|
||||
if m != nil {
|
||||
return m.PeerInfos
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type PeerExchangeRPC struct {
|
||||
Query *PeerExchangeQuery `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"`
|
||||
Response *PeerExchangeResponse `protobuf:"bytes,2,opt,name=response,proto3" json:"response,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *PeerExchangeRPC) Reset() { *m = PeerExchangeRPC{} }
|
||||
func (m *PeerExchangeRPC) String() string { return proto.CompactTextString(m) }
|
||||
func (*PeerExchangeRPC) ProtoMessage() {}
|
||||
func (*PeerExchangeRPC) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_ce50192ba54b780f, []int{3}
|
||||
}
|
||||
func (m *PeerExchangeRPC) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *PeerExchangeRPC) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_PeerExchangeRPC.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *PeerExchangeRPC) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_PeerExchangeRPC.Merge(m, src)
|
||||
}
|
||||
func (m *PeerExchangeRPC) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *PeerExchangeRPC) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_PeerExchangeRPC.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_PeerExchangeRPC proto.InternalMessageInfo
|
||||
|
||||
func (m *PeerExchangeRPC) GetQuery() *PeerExchangeQuery {
|
||||
if m != nil {
|
||||
return m.Query
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *PeerExchangeRPC) GetResponse() *PeerExchangeResponse {
|
||||
if m != nil {
|
||||
return m.Response
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*PeerInfo)(nil), "PeerInfo")
|
||||
proto.RegisterType((*PeerExchangeQuery)(nil), "PeerExchangeQuery")
|
||||
proto.RegisterType((*PeerExchangeResponse)(nil), "PeerExchangeResponse")
|
||||
proto.RegisterType((*PeerExchangeRPC)(nil), "PeerExchangeRPC")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("waku_peer_exchange.proto", fileDescriptor_ce50192ba54b780f) }
|
||||
|
||||
var fileDescriptor_ce50192ba54b780f = []byte{
|
||||
// 211 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x28, 0x4f, 0xcc, 0x2e,
|
||||
0x8d, 0x2f, 0x48, 0x4d, 0x2d, 0x8a, 0x4f, 0xad, 0x48, 0xce, 0x48, 0xcc, 0x4b, 0x4f, 0xd5, 0x2b,
|
||||
0x28, 0xca, 0x2f, 0xc9, 0x57, 0x92, 0xe1, 0xe2, 0x08, 0x48, 0x4d, 0x2d, 0xf2, 0xcc, 0x4b, 0xcb,
|
||||
0x17, 0x12, 0xe0, 0x62, 0x76, 0xf5, 0x0b, 0x92, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0x31,
|
||||
0x95, 0xf4, 0xb9, 0x04, 0x41, 0xb2, 0xae, 0x50, 0x3d, 0x81, 0xa5, 0xa9, 0x45, 0x95, 0x42, 0x52,
|
||||
0x5c, 0x1c, 0x79, 0xa5, 0xb9, 0x20, 0xf1, 0x62, 0xb0, 0x5a, 0x96, 0x20, 0x38, 0x5f, 0xc9, 0x9e,
|
||||
0x4b, 0x04, 0x59, 0x43, 0x50, 0x6a, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x90, 0x3a, 0x17, 0x67,
|
||||
0x01, 0xd4, 0x1a, 0x90, 0x26, 0x66, 0x0d, 0x6e, 0x23, 0x4e, 0x3d, 0x98, 0xc5, 0x41, 0x08, 0x39,
|
||||
0xa5, 0x3c, 0x2e, 0x7e, 0x14, 0x03, 0x02, 0x9c, 0x85, 0x34, 0xb8, 0x58, 0x0b, 0x41, 0x16, 0x83,
|
||||
0x2d, 0xe3, 0x36, 0x12, 0xd2, 0xc3, 0x70, 0x52, 0x10, 0x44, 0x81, 0x90, 0x21, 0x17, 0x47, 0x11,
|
||||
0xd4, 0x46, 0x09, 0x26, 0xb0, 0x62, 0x51, 0x3d, 0x6c, 0xce, 0x09, 0x82, 0x2b, 0x73, 0x12, 0x38,
|
||||
0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x67, 0x3c, 0x96, 0x63,
|
||||
0x48, 0x62, 0x03, 0x07, 0x8c, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x99, 0xd5, 0xbb, 0xb6, 0x34,
|
||||
0x01, 0x00, 0x00,
|
||||
}
|
||||
|
||||
func (m *PeerInfo) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *PeerInfo) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *PeerInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if len(m.ENR) > 0 {
|
||||
i -= len(m.ENR)
|
||||
copy(dAtA[i:], m.ENR)
|
||||
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(len(m.ENR)))
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *PeerExchangeQuery) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *PeerExchangeQuery) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *PeerExchangeQuery) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if m.NumPeers != 0 {
|
||||
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(m.NumPeers))
|
||||
i--
|
||||
dAtA[i] = 0x8
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *PeerExchangeResponse) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *PeerExchangeResponse) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *PeerExchangeResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if len(m.PeerInfos) > 0 {
|
||||
for iNdEx := len(m.PeerInfos) - 1; iNdEx >= 0; iNdEx-- {
|
||||
{
|
||||
size, err := m.PeerInfos[iNdEx].MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
}
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *PeerExchangeRPC) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *PeerExchangeRPC) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *PeerExchangeRPC) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if m.Response != nil {
|
||||
{
|
||||
size, err := m.Response.MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0x12
|
||||
}
|
||||
if m.Query != nil {
|
||||
{
|
||||
size, err := m.Query.MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func encodeVarintWakuPeerExchange(dAtA []byte, offset int, v uint64) int {
|
||||
offset -= sovWakuPeerExchange(v)
|
||||
base := offset
|
||||
for v >= 1<<7 {
|
||||
dAtA[offset] = uint8(v&0x7f | 0x80)
|
||||
v >>= 7
|
||||
offset++
|
||||
}
|
||||
dAtA[offset] = uint8(v)
|
||||
return base
|
||||
}
|
||||
func (m *PeerInfo) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
l = len(m.ENR)
|
||||
if l > 0 {
|
||||
n += 1 + l + sovWakuPeerExchange(uint64(l))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *PeerExchangeQuery) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if m.NumPeers != 0 {
|
||||
n += 1 + sovWakuPeerExchange(uint64(m.NumPeers))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *PeerExchangeResponse) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if len(m.PeerInfos) > 0 {
|
||||
for _, e := range m.PeerInfos {
|
||||
l = e.Size()
|
||||
n += 1 + l + sovWakuPeerExchange(uint64(l))
|
||||
}
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *PeerExchangeRPC) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if m.Query != nil {
|
||||
l = m.Query.Size()
|
||||
n += 1 + l + sovWakuPeerExchange(uint64(l))
|
||||
}
|
||||
if m.Response != nil {
|
||||
l = m.Response.Size()
|
||||
n += 1 + l + sovWakuPeerExchange(uint64(l))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func sovWakuPeerExchange(x uint64) (n int) {
|
||||
return (math_bits.Len64(x|1) + 6) / 7
|
||||
}
|
||||
func sozWakuPeerExchange(x uint64) (n int) {
|
||||
return sovWakuPeerExchange(uint64((x << 1) ^ uint64((int64(x) >> 63))))
|
||||
}
|
||||
func (m *PeerInfo) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: PeerInfo: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: PeerInfo: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field ENR", wireType)
|
||||
}
|
||||
var byteLen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
byteLen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if byteLen < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
postIndex := iNdEx + byteLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.ENR = append(m.ENR[:0], dAtA[iNdEx:postIndex]...)
|
||||
if m.ENR == nil {
|
||||
m.ENR = []byte{}
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipWakuPeerExchange(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *PeerExchangeQuery) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: PeerExchangeQuery: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: PeerExchangeQuery: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field NumPeers", wireType)
|
||||
}
|
||||
m.NumPeers = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
m.NumPeers |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipWakuPeerExchange(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *PeerExchangeResponse) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: PeerExchangeResponse: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: PeerExchangeResponse: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field PeerInfos", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.PeerInfos = append(m.PeerInfos, &PeerInfo{})
|
||||
if err := m.PeerInfos[len(m.PeerInfos)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipWakuPeerExchange(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *PeerExchangeRPC) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: PeerExchangeRPC: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: PeerExchangeRPC: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if m.Query == nil {
|
||||
m.Query = &PeerExchangeQuery{}
|
||||
}
|
||||
if err := m.Query.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
case 2:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Response", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if m.Response == nil {
|
||||
m.Response = &PeerExchangeResponse{}
|
||||
}
|
||||
if err := m.Response.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipWakuPeerExchange(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func skipWakuPeerExchange(dAtA []byte) (n int, err error) {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
depth := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
wireType := int(wire & 0x7)
|
||||
switch wireType {
|
||||
case 0:
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx++
|
||||
if dAtA[iNdEx-1] < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 1:
|
||||
iNdEx += 8
|
||||
case 2:
|
||||
var length int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowWakuPeerExchange
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
length |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if length < 0 {
|
||||
return 0, ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
iNdEx += length
|
||||
case 3:
|
||||
depth++
|
||||
case 4:
|
||||
if depth == 0 {
|
||||
return 0, ErrUnexpectedEndOfGroupWakuPeerExchange
|
||||
}
|
||||
depth--
|
||||
case 5:
|
||||
iNdEx += 4
|
||||
default:
|
||||
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
||||
}
|
||||
if iNdEx < 0 {
|
||||
return 0, ErrInvalidLengthWakuPeerExchange
|
||||
}
|
||||
if depth == 0 {
|
||||
return iNdEx, nil
|
||||
}
|
||||
}
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidLengthWakuPeerExchange = fmt.Errorf("proto: negative length found during unmarshaling")
|
||||
ErrIntOverflowWakuPeerExchange = fmt.Errorf("proto: integer overflow")
|
||||
ErrUnexpectedEndOfGroupWakuPeerExchange = fmt.Errorf("proto: unexpected end of group")
|
||||
)
|
|
@ -0,0 +1,18 @@
|
|||
syntax = "proto3";
|
||||
|
||||
message PeerInfo {
|
||||
bytes ENR = 1;
|
||||
}
|
||||
|
||||
message PeerExchangeQuery {
|
||||
uint64 numPeers = 1; // number of peers requested
|
||||
}
|
||||
|
||||
message PeerExchangeResponse {
|
||||
repeated PeerInfo peerInfos = 1;
|
||||
}
|
||||
|
||||
message PeerExchangeRPC {
|
||||
PeerExchangeQuery query = 1;
|
||||
PeerExchangeResponse response = 2;
|
||||
}
|
|
@ -0,0 +1,364 @@
|
|||
package peer_exchange
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/libp2p/go-msgio/protoio"
|
||||
"github.com/status-im/go-waku/logging"
|
||||
"github.com/status-im/go-waku/waku/v2/discv5"
|
||||
"github.com/status-im/go-waku/waku/v2/metrics"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/status-im/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
|
||||
const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1")
|
||||
const MaxCacheSize = 1000
|
||||
const CacheCleanWindow = 200
|
||||
const dialTimeout = 7 * time.Second
|
||||
|
||||
var (
|
||||
ErrNoPeersAvailable = errors.New("no suitable remote peers")
|
||||
ErrInvalidId = errors.New("invalid request id")
|
||||
)
|
||||
|
||||
type peerRecord struct {
|
||||
node enode.Node
|
||||
idx int
|
||||
}
|
||||
|
||||
type WakuPeerExchange struct {
|
||||
h host.Host
|
||||
ctx context.Context
|
||||
disc *discv5.DiscoveryV5
|
||||
|
||||
log *zap.Logger
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
|
||||
enrCacheMutex sync.RWMutex
|
||||
rng *rand.Rand
|
||||
|
||||
started bool
|
||||
}
|
||||
|
||||
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
|
||||
func NewWakuPeerExchange(ctx context.Context, h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange {
|
||||
wakuPX := new(WakuPeerExchange)
|
||||
wakuPX.ctx = ctx
|
||||
wakuPX.h = h
|
||||
wakuPX.disc = disc
|
||||
wakuPX.log = log.Named("wakupx")
|
||||
wakuPX.enrCache = make(map[enode.ID]peerRecord)
|
||||
wakuPX.rng = rand.New(rand.NewSource(rand.Int63()))
|
||||
return wakuPX
|
||||
}
|
||||
|
||||
// Start inits the peer exchange protocol
|
||||
func (wakuPX *WakuPeerExchange) Start() error {
|
||||
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest)
|
||||
wakuPX.log.Info("Peer exchange protocol started")
|
||||
wakuPX.started = true
|
||||
wakuPX.quit = make(chan struct{}, 1)
|
||||
|
||||
wakuPX.wg.Add(1)
|
||||
go wakuPX.runPeerExchangeDiscv5Loop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse) error {
|
||||
var peers []peer.AddrInfo
|
||||
for _, p := range response.PeerInfos {
|
||||
enrRecord := &enr.Record{}
|
||||
buf := bytes.NewBuffer(p.ENR)
|
||||
|
||||
err := enrRecord.DecodeRLP(rlp.NewStream(buf, uint64(len(p.ENR))))
|
||||
if err != nil {
|
||||
wakuPX.log.Error("converting bytes to enr", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
|
||||
if err != nil {
|
||||
wakuPX.log.Error("creating enode record", zap.Error(err))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
peerInfo, err := utils.EnodeToPeerInfo(enodeRecord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if wakuPX.h.Network().Connectedness(peerInfo.ID) != network.Connected {
|
||||
peers = append(peers, *peerInfo)
|
||||
}
|
||||
}
|
||||
|
||||
if len(peers) != 0 {
|
||||
log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
|
||||
for _, p := range peers {
|
||||
func(p peer.AddrInfo) {
|
||||
ctx, cancel := context.WithTimeout(wakuPX.ctx, dialTimeout)
|
||||
defer cancel()
|
||||
err := wakuPX.h.Connect(ctx, p)
|
||||
if err != nil {
|
||||
log.Info("connecting to peer", zap.String("peer", p.ID.Pretty()), zap.Error(err))
|
||||
}
|
||||
}(p)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) onRequest(s network.Stream) {
|
||||
defer s.Close()
|
||||
logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
|
||||
requestRPC := &pb.PeerExchangeRPC{}
|
||||
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
|
||||
err := reader.ReadMsg(requestRPC)
|
||||
if err != nil {
|
||||
logger.Error("reading request", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(wakuPX.ctx, "decodeRpcFailure")
|
||||
return
|
||||
}
|
||||
|
||||
if requestRPC.Query != nil {
|
||||
logger.Info("request received")
|
||||
err := wakuPX.respond(requestRPC.Query.NumPeers, s.Conn().RemotePeer())
|
||||
if err != nil {
|
||||
logger.Error("responding", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if requestRPC.Response != nil {
|
||||
logger.Info("response received")
|
||||
err := wakuPX.handleResponse(requestRPC.Response)
|
||||
if err != nil {
|
||||
logger.Error("handling response", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error {
|
||||
params := new(PeerExchangeParameters)
|
||||
params.host = wakuPX.h
|
||||
params.log = wakuPX.log
|
||||
|
||||
optList := DefaultOptions(wakuPX.h)
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
if params.selectedPeer == "" {
|
||||
metrics.RecordPeerExchangeError(wakuPX.ctx, "dialError")
|
||||
return ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
requestRPC := &pb.PeerExchangeRPC{
|
||||
Query: &pb.PeerExchangeQuery{
|
||||
NumPeers: uint64(numPeers),
|
||||
},
|
||||
}
|
||||
|
||||
return wakuPX.sendPeerExchangeRPCToPeer(requestRPC, params.selectedPeer)
|
||||
}
|
||||
|
||||
// IsStarted returns if the peer exchange protocol has been mounted or not
|
||||
func (wakuPX *WakuPeerExchange) IsStarted() bool {
|
||||
return wakuPX.started
|
||||
}
|
||||
|
||||
// Stop unmounts the peer exchange protocol
|
||||
func (wakuPX *WakuPeerExchange) Stop() {
|
||||
if wakuPX.started {
|
||||
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
|
||||
wakuPX.started = false
|
||||
close(wakuPX.quit)
|
||||
wakuPX.wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(rpc *pb.PeerExchangeRPC, peerID peer.ID) error {
|
||||
logger := wakuPX.log.With(logging.HostID("peer", peerID))
|
||||
|
||||
// We connect first so dns4 addresses are resolved (NewStream does not do it)
|
||||
err := wakuPX.h.Connect(wakuPX.ctx, wakuPX.h.Peerstore().PeerInfo(peerID))
|
||||
if err != nil {
|
||||
logger.Error("connecting peer", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
connOpt, err := wakuPX.h.NewStream(wakuPX.ctx, peerID, PeerExchangeID_v20alpha1)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer connOpt.Close()
|
||||
|
||||
writer := protoio.NewDelimitedWriter(connOpt)
|
||||
err = writer.WriteMsg(rpc)
|
||||
if err != nil {
|
||||
logger.Error("writing response", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) respond(numPeers uint64, peerID peer.ID) error {
|
||||
records, err := wakuPX.getENRsFromCache(numPeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
responseRPC := &pb.PeerExchangeRPC{}
|
||||
responseRPC.Response = new(pb.PeerExchangeResponse)
|
||||
responseRPC.Response.PeerInfos = records
|
||||
|
||||
return wakuPX.sendPeerExchangeRPCToPeer(responseRPC, peerID)
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) {
|
||||
wakuPX.enrCacheMutex.Lock()
|
||||
defer wakuPX.enrCacheMutex.Unlock()
|
||||
|
||||
if len(wakuPX.enrCache) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
numItems := int(numPeers)
|
||||
if len(wakuPX.enrCache) < int(numPeers) {
|
||||
numItems = len(wakuPX.enrCache)
|
||||
}
|
||||
|
||||
perm := wakuPX.rng.Perm(len(wakuPX.enrCache))[0:numItems]
|
||||
permSet := make(map[int]int)
|
||||
for i, v := range perm {
|
||||
permSet[v] = i
|
||||
}
|
||||
|
||||
var result []*pb.PeerInfo
|
||||
iter := 0
|
||||
for k := range wakuPX.enrCache {
|
||||
if _, ok := permSet[iter]; ok {
|
||||
var b bytes.Buffer
|
||||
writer := bufio.NewWriter(&b)
|
||||
enode := wakuPX.enrCache[k]
|
||||
|
||||
err := enode.node.Record().EncodeRLP(writer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
writer.Flush()
|
||||
|
||||
result = append(result, &pb.PeerInfo{
|
||||
ENR: b.Bytes(),
|
||||
})
|
||||
}
|
||||
iter++
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) cleanCache() {
|
||||
if len(wakuPX.enrCache) < MaxCacheSize {
|
||||
return
|
||||
}
|
||||
|
||||
r := make(map[enode.ID]peerRecord)
|
||||
for k, v := range wakuPX.enrCache {
|
||||
if v.idx > CacheCleanWindow {
|
||||
v.idx -= CacheCleanWindow
|
||||
r[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
wakuPX.enrCache = r
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) findPeers() {
|
||||
if !wakuPX.disc.IsStarted() {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(wakuPX.ctx, 2*time.Second)
|
||||
defer cancel()
|
||||
peerRecords, err := wakuPX.disc.FindNodes(ctx, "")
|
||||
if err != nil {
|
||||
wakuPX.log.Error("finding peers", zap.Error(err))
|
||||
}
|
||||
|
||||
cnt := 0
|
||||
wakuPX.enrCacheMutex.Lock()
|
||||
for _, p := range peerRecords {
|
||||
cnt++
|
||||
wakuPX.enrCache[p.Node.ID()] = peerRecord{
|
||||
idx: len(wakuPX.enrCache),
|
||||
node: p.Node,
|
||||
}
|
||||
}
|
||||
wakuPX.enrCacheMutex.Unlock()
|
||||
|
||||
wakuPX.log.Info("discovered px peers via discv5", zap.Int("count", cnt))
|
||||
|
||||
wakuPX.cleanCache()
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop() {
|
||||
defer wakuPX.wg.Done()
|
||||
|
||||
// Runs a discv5 loop adding new peers to the px peer cache
|
||||
if wakuPX.disc == nil {
|
||||
wakuPX.log.Warn("trying to run discovery v5 (for PX) while it's disabled")
|
||||
return
|
||||
}
|
||||
|
||||
wakuPX.log.Info("starting peer exchange discovery v5 loop")
|
||||
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
// This loop "competes" with the loop in wakunode2
|
||||
// For the purpose of collecting px peers, 30 sec intervals should be enough
|
||||
|
||||
wakuPX.findPeers()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-wakuPX.quit:
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
wakuPX.findPeers()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
package peer_exchange
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/status-im/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type PeerExchangeParameters struct {
|
||||
host host.Host
|
||||
selectedPeer peer.ID
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
type PeerExchangeOption func(*PeerExchangeParameters)
|
||||
|
||||
// WithPeer is an option used to specify the peerID to push a waku message to
|
||||
func WithPeer(p peer.ID) PeerExchangeOption {
|
||||
return func(params *PeerExchangeParameters) {
|
||||
params.selectedPeer = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
|
||||
// to push a waku message to
|
||||
func WithAutomaticPeerSelection() PeerExchangeOption {
|
||||
return func(params *PeerExchangeParameters) {
|
||||
p, err := utils.SelectPeer(params.host, string(PeerExchangeID_v20alpha1), params.log)
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithFastestPeerSelection is an option used to select a peer from the peer store
|
||||
// with the lowest ping
|
||||
func WithFastestPeerSelection(ctx context.Context) PeerExchangeOption {
|
||||
return func(params *PeerExchangeParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(PeerExchangeID_v20alpha1), params.log)
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultOptions are the default options to be used when using the lightpush protocol
|
||||
func DefaultOptions(host host.Host) []PeerExchangeOption {
|
||||
return []PeerExchangeOption{
|
||||
WithAutomaticPeerSelection(),
|
||||
}
|
||||
}
|
|
@ -0,0 +1,162 @@
|
|||
package peer_exchange
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
gcrypto "github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/status-im/go-waku/tests"
|
||||
"github.com/status-im/go-waku/waku/v2/discv5"
|
||||
"github.com/status-im/go-waku/waku/v2/utils"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
|
||||
)
|
||||
|
||||
func createHost(t *testing.T) (host.Host, int, *ecdsa.PrivateKey) {
|
||||
privKey, err := gcrypto.GenerateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey))
|
||||
|
||||
port, err := tests.FindFreePort(t, "127.0.0.1", 3)
|
||||
require.NoError(t, err)
|
||||
|
||||
sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
|
||||
require.NoError(t, err)
|
||||
|
||||
host, err := libp2p.New(
|
||||
libp2p.ListenAddrs(sourceMultiAddr),
|
||||
libp2p.Identity(sPrivKey),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
return host, port, privKey
|
||||
}
|
||||
|
||||
func extractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) {
|
||||
ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
port, err := strconv.Atoi(portStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &net.TCPAddr{
|
||||
IP: net.ParseIP(ipStr),
|
||||
Port: port,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) {
|
||||
db, err := enode.OpenDB("")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
localnode := enode.NewLocalNode(db, priv)
|
||||
localnode.SetFallbackUDP(udpPort)
|
||||
localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags))
|
||||
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
|
||||
localnode.SetStaticIP(ipAddr.IP)
|
||||
|
||||
if udpPort > 0 && udpPort <= math.MaxUint16 {
|
||||
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion]
|
||||
} else {
|
||||
log.Error("setting udpPort", zap.Int("port", udpPort))
|
||||
}
|
||||
|
||||
if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 {
|
||||
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion]
|
||||
} else {
|
||||
log.Error("setting tcpPort", zap.Int("port", ipAddr.Port))
|
||||
}
|
||||
|
||||
if advertiseAddr != nil {
|
||||
localnode.SetStaticIP(*advertiseAddr)
|
||||
}
|
||||
|
||||
return localnode, nil
|
||||
}
|
||||
|
||||
func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
|
||||
// H1
|
||||
host1, _, prvKey1 := createHost(t)
|
||||
udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3)
|
||||
require.NoError(t, err)
|
||||
ip1, _ := extractIP(host1.Addrs()[0])
|
||||
l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
|
||||
require.NoError(t, err)
|
||||
d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), discv5.WithUDPPort(udpPort1))
|
||||
require.NoError(t, err)
|
||||
|
||||
// H2
|
||||
host2, _, prvKey2 := createHost(t)
|
||||
ip2, _ := extractIP(host2.Addrs()[0])
|
||||
udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3)
|
||||
require.NoError(t, err)
|
||||
l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
|
||||
require.NoError(t, err)
|
||||
d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), discv5.WithUDPPort(udpPort2), discv5.WithBootnodes([]*enode.Node{d1.Node()}))
|
||||
require.NoError(t, err)
|
||||
|
||||
// H3
|
||||
host3, _, _ := createHost(t)
|
||||
|
||||
defer d1.Stop()
|
||||
defer d2.Stop()
|
||||
defer host1.Close()
|
||||
defer host2.Close()
|
||||
defer host3.Close()
|
||||
|
||||
err = d1.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = d2.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
// mount peer exchange
|
||||
px1 := NewWakuPeerExchange(context.Background(), host1, d1, utils.Logger())
|
||||
px3 := NewWakuPeerExchange(context.Background(), host3, nil, utils.Logger())
|
||||
|
||||
err = px1.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = px3.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic
|
||||
|
||||
host3.Peerstore().AddAddrs(host1.ID(), host1.Addrs(), peerstore.PermanentAddrTTL)
|
||||
err = host3.Peerstore().AddProtocols(host1.ID(), string(PeerExchangeID_v20alpha1))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = px3.Request(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic
|
||||
|
||||
peer2Info := host3.Peerstore().PeerInfo(host2.ID())
|
||||
require.Equal(t, host2.Addrs()[0], peer2Info.Addrs[0])
|
||||
|
||||
px1.Stop()
|
||||
px3.Stop()
|
||||
}
|
Loading…
Reference in New Issue