diff --git a/waku.go b/waku.go index b4fa1ab1..99ab1292 100644 --- a/waku.go +++ b/waku.go @@ -332,6 +332,18 @@ func main() { Usage: "Discovery can automatically update its ENR with the IP address as seen by other nodes it communicates with.", Destination: &options.DiscV5.AutoUpdate, }, + &cli.BoolFlag{ + Name: "peer-exchange", + Usage: "Enable waku peer exchange protocol (responder side)", + Destination: &options.PeerExchange.Enable, + }, + &cli.GenericFlag{ + Name: "peer-exchange-node", + Usage: "Peer multiaddr to send peer exchange requests to. (enables peer exchange protocol requester side)", + Value: &cliutils.MultiaddrValue{ + Value: &options.PeerExchange.Node, + }, + }, &cli.BoolFlag{ Name: "dns-discovery", Usage: "Enable DNS discovery", diff --git a/waku/node.go b/waku/node.go index 8c724c1f..a304155d 100644 --- a/waku/node.go +++ b/waku/node.go @@ -40,6 +40,7 @@ import ( "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" + "github.com/status-im/go-waku/waku/v2/protocol/peer_exchange" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/status-im/go-waku/waku/v2/rest" @@ -254,6 +255,10 @@ func Execute(options Options) { nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second)))) } + if options.PeerExchange.Enable { + nodeOpts = append(nodeOpts, node.WithPeerExchange()) + } + checkForRLN(logger, options, &nodeOpts) wakuNode, err := node.New(ctx, nodeOpts...) @@ -272,6 +277,21 @@ func Execute(options Options) { if err = wakuNode.DiscV5().Start(); err != nil { logger.Fatal("starting discovery v5", zap.Error(err)) } + + // retrieve and connect to peer exchange peers + if options.PeerExchange.Enable && options.PeerExchange.Node != nil { + logger.Info("retrieving peer info via peer exchange protocol") + + peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, string(peer_exchange.PeerExchangeID_v20alpha1)) + if err != nil { + logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err)) + } else { + desiredOutDegree := 6 // TODO: obtain this from gossipsub D + if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(*peerId)); err != nil { + logger.Error("requesting peers via peer exchange", zap.Error(err)) + } + } + } } if len(options.Relay.Topics.Value()) == 0 { diff --git a/waku/options.go b/waku/options.go index c6d10e1a..972ac99d 100644 --- a/waku/options.go +++ b/waku/options.go @@ -131,6 +131,12 @@ type WSOptions struct { CertPath string } +// PeerExchangeOptions are settings used with the peer exchange protocol +type PeerExchangeOptions struct { + Enable bool + Node *multiaddr.Multiaddr +} + // Options contains all the available features and settings that can be // configured via flags when executing go-waku as a service. type Options struct { @@ -156,6 +162,7 @@ type Options struct { PersistPeers bool UserAgent string + PeerExchange PeerExchangeOptions Websocket WSOptions Relay RelayOptions Store StoreOptions diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 6baf8211..4deeeb05 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -21,7 +21,7 @@ import ( ) type DiscoveryV5 struct { - sync.Mutex + sync.RWMutex discovery.Discovery @@ -33,6 +33,7 @@ type DiscoveryV5 struct { localnode *enode.LocalNode NAT nat.Interface quit chan struct{} + started bool log *zap.Logger @@ -43,13 +44,14 @@ type DiscoveryV5 struct { type peerCache struct { sync.RWMutex - recs map[peer.ID]peerRecord + recs map[peer.ID]PeerRecord rng *rand.Rand } -type peerRecord struct { +type PeerRecord struct { expire int64 - peer peer.AddrInfo + Peer peer.AddrInfo + Node enode.Node } type discV5Parameters struct { @@ -115,7 +117,7 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc wg: &sync.WaitGroup{}, peerCache: peerCache{ rng: rand.New(rand.NewSource(rand.Int63())), - recs: make(map[peer.ID]peerRecord), + recs: make(map[peer.ID]PeerRecord), }, localnode: localnode, config: discover.Config{ @@ -137,6 +139,10 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc }, nil } +func (d *DiscoveryV5) Node() *enode.Node { + return d.localnode.Node() +} + func (d *DiscoveryV5) listen() error { conn, err := net.ListenUDP("udp", d.udpAddr) if err != nil { @@ -174,9 +180,14 @@ func (d *DiscoveryV5) Start() error { d.Lock() defer d.Unlock() + if d.started { + return nil + } + d.wg.Wait() // Waiting for other go routines to stop d.quit = make(chan struct{}, 1) + d.started = true err := d.listen() if err != nil { @@ -190,10 +201,15 @@ func (d *DiscoveryV5) Stop() { d.Lock() defer d.Unlock() + if !d.started { + return + } + close(d.quit) d.listener.Close() d.listener = nil + d.started = false d.log.Info("stopped Discovery V5") @@ -293,9 +309,10 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi } for _, p := range peerAddrs { - d.peerCache.recs[p.ID] = peerRecord{ + d.peerCache.recs[p.ID] = PeerRecord{ expire: time.Now().Unix() + 3600, // Expires in 1hr - peer: p, + Peer: p, + Node: *iterator.Node(), } } @@ -320,7 +337,7 @@ func (d *DiscoveryV5) removeExpiredPeers() int { return newCacheSize } -func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { +func (d *DiscoveryV5) FindNodes(ctx context.Context, topic string, opts ...discovery.Option) ([]PeerRecord, error) { // Get options var options discovery.Options err := options.Apply(opts...) @@ -328,7 +345,7 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco return nil, err } - const maxLimit = 100 + const maxLimit = 600 limit := options.Limit if limit == 0 || limit > maxLimit { limit = maxLimit @@ -368,29 +385,43 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco count = limit } - chPeer := make(chan peer.AddrInfo, count) - perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count] permSet := make(map[int]int) for i, v := range perm { permSet[v] = i } - sendLst := make([]*peer.AddrInfo, count) + sendLst := make([]PeerRecord, count) iter := 0 for k := range d.peerCache.recs { if sendIndex, ok := permSet[iter]; ok { - peerInfo := d.peerCache.recs[k].peer - sendLst[sendIndex] = &peerInfo + sendLst[sendIndex] = d.peerCache.recs[k] } iter++ } - for _, send := range sendLst { - chPeer <- *send + return sendLst, err +} + +func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + records, err := d.FindNodes(ctx, topic, opts...) + if err != nil { + return nil, err + } + + chPeer := make(chan peer.AddrInfo, len(records)) + for _, r := range records { + chPeer <- r.Peer } close(chPeer) return chPeer, err } + +func (d *DiscoveryV5) IsStarted() bool { + d.RLock() + defer d.RUnlock() + + return d.started +} diff --git a/waku/v2/metrics/metrics.go b/waku/v2/metrics/metrics.go index 19b628ba..28c0844e 100644 --- a/waku/v2/metrics/metrics.go +++ b/waku/v2/metrics/metrics.go @@ -18,6 +18,7 @@ var ( FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless) StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless) LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless) + PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless) ) var ( @@ -79,6 +80,12 @@ func RecordLightpushError(ctx context.Context, tagType string) { } } +func RecordPeerExchangeError(ctx context.Context, tagType string) { + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(ErrorType, tagType)}, PeerExchangeError.M(1)); err != nil { + utils.Logger().Error("failed to record with tags", zap.Error(err)) + } +} + func RecordMessage(ctx context.Context, tagType string, len int) { if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoreMessages.M(int64(len))); err != nil { utils.Logger().Error("failed to record with tags", zap.Error(err)) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 76ad965c..ad01266e 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -32,6 +32,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/peer_exchange" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/status-im/go-waku/waku/v2/protocol/swap" @@ -78,7 +79,8 @@ type WakuNode struct { addrChan chan ma.Multiaddr - discoveryV5 *discv5.DiscoveryV5 + discoveryV5 *discv5.DiscoveryV5 + peerExchange *peer_exchange.WakuPeerExchange bcaster v2.Broadcaster @@ -291,6 +293,13 @@ func (w *WakuNode) Start() error { } } + if w.opts.enablePeerExchange { + err := w.mountPeerExchange() + if err != nil { + return err + } + } + if w.opts.enableDiscV5 { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...)) } @@ -345,6 +354,14 @@ func (w *WakuNode) Stop() { w.filter.Stop() } + if w.peerExchange != nil { + w.peerExchange.Stop() + } + + if w.discoveryV5 != nil { + w.discoveryV5.Stop() + } + w.relay.Stop() w.lightPush.Stop() w.store.Stop() @@ -405,6 +422,11 @@ func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 { return w.discoveryV5 } +// PeerExchange is used to access any operation related to Peer Exchange +func (w *WakuNode) PeerExchange() *peer_exchange.WakuPeerExchange { + return w.peerExchange +} + // Broadcaster is used to access the message broadcaster that is used to push // messages to different protocols func (w *WakuNode) Broadcaster() v2.Broadcaster { @@ -474,6 +496,11 @@ func (w *WakuNode) mountDiscV5() error { return err } +func (w *WakuNode) mountPeerExchange() error { + w.peerExchange = peer_exchange.NewWakuPeerExchange(w.ctx, w.host, w.discoveryV5, w.log) + return w.peerExchange.Start() +} + func (w *WakuNode) startStore() { w.store.Start(w.ctx) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 5362d39d..0ad384e9 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -77,6 +77,8 @@ type WakuNodeParameters struct { discV5Opts []pubsub.DiscoverOpt discV5autoUpdate bool + enablePeerExchange bool + enableRLN bool rlnRelayMemIndex uint rlnRelayPubsubTopic string @@ -277,6 +279,14 @@ func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, disc } } +// WithPeerExchange is a WakuOption used to enable Peer Exchange +func WithPeerExchange() WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.enablePeerExchange = true + return nil + } +} + // WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption // accepts a list of WakuFilter gossipsub options to setup the protocol func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption { diff --git a/waku/v2/protocol/pb/generate.go b/waku/v2/protocol/pb/generate.go index 4e422ed5..dc477295 100644 --- a/waku/v2/protocol/pb/generate.go +++ b/waku/v2/protocol/pb/generate.go @@ -5,3 +5,4 @@ package pb //go:generate protoc -I. --gofast_out=. ./waku_message.proto //go:generate protoc -I. --gofast_out=. ./waku_store.proto //go:generate protoc -I. --gofast_out=. ./waku_swap.proto +//go:generate protoc -I. --gofast_out=. ./waku_peer_exchange.proto diff --git a/waku/v2/protocol/pb/waku_peer_exchange.pb.go b/waku/v2/protocol/pb/waku_peer_exchange.pb.go new file mode 100644 index 00000000..9cda32bb --- /dev/null +++ b/waku/v2/protocol/pb/waku_peer_exchange.pb.go @@ -0,0 +1,938 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: waku_peer_exchange.proto + +package pb + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type PeerInfo struct { + ENR []byte `protobuf:"bytes,1,opt,name=ENR,proto3" json:"ENR,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PeerInfo) Reset() { *m = PeerInfo{} } +func (m *PeerInfo) String() string { return proto.CompactTextString(m) } +func (*PeerInfo) ProtoMessage() {} +func (*PeerInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_ce50192ba54b780f, []int{0} +} +func (m *PeerInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PeerInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PeerInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PeerInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_PeerInfo.Merge(m, src) +} +func (m *PeerInfo) XXX_Size() int { + return m.Size() +} +func (m *PeerInfo) XXX_DiscardUnknown() { + xxx_messageInfo_PeerInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_PeerInfo proto.InternalMessageInfo + +func (m *PeerInfo) GetENR() []byte { + if m != nil { + return m.ENR + } + return nil +} + +type PeerExchangeQuery struct { + NumPeers uint64 `protobuf:"varint,1,opt,name=numPeers,proto3" json:"numPeers,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PeerExchangeQuery) Reset() { *m = PeerExchangeQuery{} } +func (m *PeerExchangeQuery) String() string { return proto.CompactTextString(m) } +func (*PeerExchangeQuery) ProtoMessage() {} +func (*PeerExchangeQuery) Descriptor() ([]byte, []int) { + return fileDescriptor_ce50192ba54b780f, []int{1} +} +func (m *PeerExchangeQuery) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PeerExchangeQuery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PeerExchangeQuery.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PeerExchangeQuery) XXX_Merge(src proto.Message) { + xxx_messageInfo_PeerExchangeQuery.Merge(m, src) +} +func (m *PeerExchangeQuery) XXX_Size() int { + return m.Size() +} +func (m *PeerExchangeQuery) XXX_DiscardUnknown() { + xxx_messageInfo_PeerExchangeQuery.DiscardUnknown(m) +} + +var xxx_messageInfo_PeerExchangeQuery proto.InternalMessageInfo + +func (m *PeerExchangeQuery) GetNumPeers() uint64 { + if m != nil { + return m.NumPeers + } + return 0 +} + +type PeerExchangeResponse struct { + PeerInfos []*PeerInfo `protobuf:"bytes,1,rep,name=peerInfos,proto3" json:"peerInfos,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PeerExchangeResponse) Reset() { *m = PeerExchangeResponse{} } +func (m *PeerExchangeResponse) String() string { return proto.CompactTextString(m) } +func (*PeerExchangeResponse) ProtoMessage() {} +func (*PeerExchangeResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_ce50192ba54b780f, []int{2} +} +func (m *PeerExchangeResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PeerExchangeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PeerExchangeResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PeerExchangeResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_PeerExchangeResponse.Merge(m, src) +} +func (m *PeerExchangeResponse) XXX_Size() int { + return m.Size() +} +func (m *PeerExchangeResponse) XXX_DiscardUnknown() { + xxx_messageInfo_PeerExchangeResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_PeerExchangeResponse proto.InternalMessageInfo + +func (m *PeerExchangeResponse) GetPeerInfos() []*PeerInfo { + if m != nil { + return m.PeerInfos + } + return nil +} + +type PeerExchangeRPC struct { + Query *PeerExchangeQuery `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` + Response *PeerExchangeResponse `protobuf:"bytes,2,opt,name=response,proto3" json:"response,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PeerExchangeRPC) Reset() { *m = PeerExchangeRPC{} } +func (m *PeerExchangeRPC) String() string { return proto.CompactTextString(m) } +func (*PeerExchangeRPC) ProtoMessage() {} +func (*PeerExchangeRPC) Descriptor() ([]byte, []int) { + return fileDescriptor_ce50192ba54b780f, []int{3} +} +func (m *PeerExchangeRPC) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PeerExchangeRPC) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PeerExchangeRPC.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PeerExchangeRPC) XXX_Merge(src proto.Message) { + xxx_messageInfo_PeerExchangeRPC.Merge(m, src) +} +func (m *PeerExchangeRPC) XXX_Size() int { + return m.Size() +} +func (m *PeerExchangeRPC) XXX_DiscardUnknown() { + xxx_messageInfo_PeerExchangeRPC.DiscardUnknown(m) +} + +var xxx_messageInfo_PeerExchangeRPC proto.InternalMessageInfo + +func (m *PeerExchangeRPC) GetQuery() *PeerExchangeQuery { + if m != nil { + return m.Query + } + return nil +} + +func (m *PeerExchangeRPC) GetResponse() *PeerExchangeResponse { + if m != nil { + return m.Response + } + return nil +} + +func init() { + proto.RegisterType((*PeerInfo)(nil), "PeerInfo") + proto.RegisterType((*PeerExchangeQuery)(nil), "PeerExchangeQuery") + proto.RegisterType((*PeerExchangeResponse)(nil), "PeerExchangeResponse") + proto.RegisterType((*PeerExchangeRPC)(nil), "PeerExchangeRPC") +} + +func init() { proto.RegisterFile("waku_peer_exchange.proto", fileDescriptor_ce50192ba54b780f) } + +var fileDescriptor_ce50192ba54b780f = []byte{ + // 211 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x28, 0x4f, 0xcc, 0x2e, + 0x8d, 0x2f, 0x48, 0x4d, 0x2d, 0x8a, 0x4f, 0xad, 0x48, 0xce, 0x48, 0xcc, 0x4b, 0x4f, 0xd5, 0x2b, + 0x28, 0xca, 0x2f, 0xc9, 0x57, 0x92, 0xe1, 0xe2, 0x08, 0x48, 0x4d, 0x2d, 0xf2, 0xcc, 0x4b, 0xcb, + 0x17, 0x12, 0xe0, 0x62, 0x76, 0xf5, 0x0b, 0x92, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0x31, + 0x95, 0xf4, 0xb9, 0x04, 0x41, 0xb2, 0xae, 0x50, 0x3d, 0x81, 0xa5, 0xa9, 0x45, 0x95, 0x42, 0x52, + 0x5c, 0x1c, 0x79, 0xa5, 0xb9, 0x20, 0xf1, 0x62, 0xb0, 0x5a, 0x96, 0x20, 0x38, 0x5f, 0xc9, 0x9e, + 0x4b, 0x04, 0x59, 0x43, 0x50, 0x6a, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x90, 0x3a, 0x17, 0x67, + 0x01, 0xd4, 0x1a, 0x90, 0x26, 0x66, 0x0d, 0x6e, 0x23, 0x4e, 0x3d, 0x98, 0xc5, 0x41, 0x08, 0x39, + 0xa5, 0x3c, 0x2e, 0x7e, 0x14, 0x03, 0x02, 0x9c, 0x85, 0x34, 0xb8, 0x58, 0x0b, 0x41, 0x16, 0x83, + 0x2d, 0xe3, 0x36, 0x12, 0xd2, 0xc3, 0x70, 0x52, 0x10, 0x44, 0x81, 0x90, 0x21, 0x17, 0x47, 0x11, + 0xd4, 0x46, 0x09, 0x26, 0xb0, 0x62, 0x51, 0x3d, 0x6c, 0xce, 0x09, 0x82, 0x2b, 0x73, 0x12, 0x38, + 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x67, 0x3c, 0x96, 0x63, + 0x48, 0x62, 0x03, 0x07, 0x8c, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x99, 0xd5, 0xbb, 0xb6, 0x34, + 0x01, 0x00, 0x00, +} + +func (m *PeerInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PeerInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PeerInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.ENR) > 0 { + i -= len(m.ENR) + copy(dAtA[i:], m.ENR) + i = encodeVarintWakuPeerExchange(dAtA, i, uint64(len(m.ENR))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *PeerExchangeQuery) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PeerExchangeQuery) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PeerExchangeQuery) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.NumPeers != 0 { + i = encodeVarintWakuPeerExchange(dAtA, i, uint64(m.NumPeers)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *PeerExchangeResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PeerExchangeResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PeerExchangeResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.PeerInfos) > 0 { + for iNdEx := len(m.PeerInfos) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.PeerInfos[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintWakuPeerExchange(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *PeerExchangeRPC) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PeerExchangeRPC) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PeerExchangeRPC) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Response != nil { + { + size, err := m.Response.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintWakuPeerExchange(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Query != nil { + { + size, err := m.Query.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintWakuPeerExchange(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintWakuPeerExchange(dAtA []byte, offset int, v uint64) int { + offset -= sovWakuPeerExchange(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *PeerInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ENR) + if l > 0 { + n += 1 + l + sovWakuPeerExchange(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *PeerExchangeQuery) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.NumPeers != 0 { + n += 1 + sovWakuPeerExchange(uint64(m.NumPeers)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *PeerExchangeResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.PeerInfos) > 0 { + for _, e := range m.PeerInfos { + l = e.Size() + n += 1 + l + sovWakuPeerExchange(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *PeerExchangeRPC) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Query != nil { + l = m.Query.Size() + n += 1 + l + sovWakuPeerExchange(uint64(l)) + } + if m.Response != nil { + l = m.Response.Size() + n += 1 + l + sovWakuPeerExchange(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovWakuPeerExchange(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozWakuPeerExchange(x uint64) (n int) { + return sovWakuPeerExchange(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *PeerInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PeerInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PeerInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ENR", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthWakuPeerExchange + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthWakuPeerExchange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ENR = append(m.ENR[:0], dAtA[iNdEx:postIndex]...) + if m.ENR == nil { + m.ENR = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipWakuPeerExchange(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthWakuPeerExchange + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PeerExchangeQuery) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PeerExchangeQuery: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PeerExchangeQuery: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NumPeers", wireType) + } + m.NumPeers = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NumPeers |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipWakuPeerExchange(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthWakuPeerExchange + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PeerExchangeResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PeerExchangeResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PeerExchangeResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PeerInfos", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWakuPeerExchange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWakuPeerExchange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PeerInfos = append(m.PeerInfos, &PeerInfo{}) + if err := m.PeerInfos[len(m.PeerInfos)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipWakuPeerExchange(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthWakuPeerExchange + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PeerExchangeRPC) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PeerExchangeRPC: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PeerExchangeRPC: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWakuPeerExchange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWakuPeerExchange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Query == nil { + m.Query = &PeerExchangeQuery{} + } + if err := m.Query.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Response", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWakuPeerExchange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWakuPeerExchange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Response == nil { + m.Response = &PeerExchangeResponse{} + } + if err := m.Response.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipWakuPeerExchange(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthWakuPeerExchange + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipWakuPeerExchange(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWakuPeerExchange + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthWakuPeerExchange + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupWakuPeerExchange + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthWakuPeerExchange + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthWakuPeerExchange = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowWakuPeerExchange = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupWakuPeerExchange = fmt.Errorf("proto: unexpected end of group") +) diff --git a/waku/v2/protocol/pb/waku_peer_exchange.proto b/waku/v2/protocol/pb/waku_peer_exchange.proto new file mode 100644 index 00000000..0a77911c --- /dev/null +++ b/waku/v2/protocol/pb/waku_peer_exchange.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +message PeerInfo { + bytes ENR = 1; +} + +message PeerExchangeQuery { + uint64 numPeers = 1; // number of peers requested +} + +message PeerExchangeResponse { + repeated PeerInfo peerInfos = 1; +} + +message PeerExchangeRPC { + PeerExchangeQuery query = 1; + PeerExchangeResponse response = 2; +} \ No newline at end of file diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange.go new file mode 100644 index 00000000..ca945bf7 --- /dev/null +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange.go @@ -0,0 +1,364 @@ +package peer_exchange + +import ( + "bufio" + "bytes" + "context" + "errors" + "math" + "math/rand" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/rlp" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-msgio/protoio" + "github.com/status-im/go-waku/logging" + "github.com/status-im/go-waku/waku/v2/discv5" + "github.com/status-im/go-waku/waku/v2/metrics" + "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier +const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1") +const MaxCacheSize = 1000 +const CacheCleanWindow = 200 +const dialTimeout = 7 * time.Second + +var ( + ErrNoPeersAvailable = errors.New("no suitable remote peers") + ErrInvalidId = errors.New("invalid request id") +) + +type peerRecord struct { + node enode.Node + idx int +} + +type WakuPeerExchange struct { + h host.Host + ctx context.Context + disc *discv5.DiscoveryV5 + + log *zap.Logger + quit chan struct{} + wg sync.WaitGroup + + enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ + enrCacheMutex sync.RWMutex + rng *rand.Rand + + started bool +} + +// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct +func NewWakuPeerExchange(ctx context.Context, h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange { + wakuPX := new(WakuPeerExchange) + wakuPX.ctx = ctx + wakuPX.h = h + wakuPX.disc = disc + wakuPX.log = log.Named("wakupx") + wakuPX.enrCache = make(map[enode.ID]peerRecord) + wakuPX.rng = rand.New(rand.NewSource(rand.Int63())) + return wakuPX +} + +// Start inits the peer exchange protocol +func (wakuPX *WakuPeerExchange) Start() error { + wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest) + wakuPX.log.Info("Peer exchange protocol started") + wakuPX.started = true + wakuPX.quit = make(chan struct{}, 1) + + wakuPX.wg.Add(1) + go wakuPX.runPeerExchangeDiscv5Loop() + + return nil +} + +func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse) error { + var peers []peer.AddrInfo + for _, p := range response.PeerInfos { + enrRecord := &enr.Record{} + buf := bytes.NewBuffer(p.ENR) + + err := enrRecord.DecodeRLP(rlp.NewStream(buf, uint64(len(p.ENR)))) + if err != nil { + wakuPX.log.Error("converting bytes to enr", zap.Error(err)) + return err + } + + enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord) + if err != nil { + wakuPX.log.Error("creating enode record", zap.Error(err)) + + return err + } + + peerInfo, err := utils.EnodeToPeerInfo(enodeRecord) + if err != nil { + return err + } + + if wakuPX.h.Network().Connectedness(peerInfo.ID) != network.Connected { + peers = append(peers, *peerInfo) + } + } + + if len(peers) != 0 { + log.Info("connecting to newly discovered peers", zap.Int("count", len(peers))) + for _, p := range peers { + func(p peer.AddrInfo) { + ctx, cancel := context.WithTimeout(wakuPX.ctx, dialTimeout) + defer cancel() + err := wakuPX.h.Connect(ctx, p) + if err != nil { + log.Info("connecting to peer", zap.String("peer", p.ID.Pretty()), zap.Error(err)) + } + }(p) + } + } + + return nil +} + +func (wakuPX *WakuPeerExchange) onRequest(s network.Stream) { + defer s.Close() + logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer())) + requestRPC := &pb.PeerExchangeRPC{} + reader := protoio.NewDelimitedReader(s, math.MaxInt32) + err := reader.ReadMsg(requestRPC) + if err != nil { + logger.Error("reading request", zap.Error(err)) + metrics.RecordPeerExchangeError(wakuPX.ctx, "decodeRpcFailure") + return + } + + if requestRPC.Query != nil { + logger.Info("request received") + err := wakuPX.respond(requestRPC.Query.NumPeers, s.Conn().RemotePeer()) + if err != nil { + logger.Error("responding", zap.Error(err)) + metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure") + return + } + } + + if requestRPC.Response != nil { + logger.Info("response received") + err := wakuPX.handleResponse(requestRPC.Response) + if err != nil { + logger.Error("handling response", zap.Error(err)) + metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure") + return + } + } +} + +func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error { + params := new(PeerExchangeParameters) + params.host = wakuPX.h + params.log = wakuPX.log + + optList := DefaultOptions(wakuPX.h) + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) + } + + if params.selectedPeer == "" { + metrics.RecordPeerExchangeError(wakuPX.ctx, "dialError") + return ErrNoPeersAvailable + } + + requestRPC := &pb.PeerExchangeRPC{ + Query: &pb.PeerExchangeQuery{ + NumPeers: uint64(numPeers), + }, + } + + return wakuPX.sendPeerExchangeRPCToPeer(requestRPC, params.selectedPeer) +} + +// IsStarted returns if the peer exchange protocol has been mounted or not +func (wakuPX *WakuPeerExchange) IsStarted() bool { + return wakuPX.started +} + +// Stop unmounts the peer exchange protocol +func (wakuPX *WakuPeerExchange) Stop() { + if wakuPX.started { + wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1) + wakuPX.started = false + close(wakuPX.quit) + wakuPX.wg.Wait() + } +} + +func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(rpc *pb.PeerExchangeRPC, peerID peer.ID) error { + logger := wakuPX.log.With(logging.HostID("peer", peerID)) + + // We connect first so dns4 addresses are resolved (NewStream does not do it) + err := wakuPX.h.Connect(wakuPX.ctx, wakuPX.h.Peerstore().PeerInfo(peerID)) + if err != nil { + logger.Error("connecting peer", zap.Error(err)) + return err + } + + connOpt, err := wakuPX.h.NewStream(wakuPX.ctx, peerID, PeerExchangeID_v20alpha1) + if err != nil { + logger.Error("creating stream to peer", zap.Error(err)) + return err + } + defer connOpt.Close() + + writer := protoio.NewDelimitedWriter(connOpt) + err = writer.WriteMsg(rpc) + if err != nil { + logger.Error("writing response", zap.Error(err)) + return err + } + + return nil +} + +func (wakuPX *WakuPeerExchange) respond(numPeers uint64, peerID peer.ID) error { + records, err := wakuPX.getENRsFromCache(numPeers) + if err != nil { + return err + } + + responseRPC := &pb.PeerExchangeRPC{} + responseRPC.Response = new(pb.PeerExchangeResponse) + responseRPC.Response.PeerInfos = records + + return wakuPX.sendPeerExchangeRPCToPeer(responseRPC, peerID) +} + +func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) { + wakuPX.enrCacheMutex.Lock() + defer wakuPX.enrCacheMutex.Unlock() + + if len(wakuPX.enrCache) == 0 { + return nil, nil + } + + numItems := int(numPeers) + if len(wakuPX.enrCache) < int(numPeers) { + numItems = len(wakuPX.enrCache) + } + + perm := wakuPX.rng.Perm(len(wakuPX.enrCache))[0:numItems] + permSet := make(map[int]int) + for i, v := range perm { + permSet[v] = i + } + + var result []*pb.PeerInfo + iter := 0 + for k := range wakuPX.enrCache { + if _, ok := permSet[iter]; ok { + var b bytes.Buffer + writer := bufio.NewWriter(&b) + enode := wakuPX.enrCache[k] + + err := enode.node.Record().EncodeRLP(writer) + if err != nil { + return nil, err + } + + writer.Flush() + + result = append(result, &pb.PeerInfo{ + ENR: b.Bytes(), + }) + } + iter++ + } + + return result, nil +} + +func (wakuPX *WakuPeerExchange) cleanCache() { + if len(wakuPX.enrCache) < MaxCacheSize { + return + } + + r := make(map[enode.ID]peerRecord) + for k, v := range wakuPX.enrCache { + if v.idx > CacheCleanWindow { + v.idx -= CacheCleanWindow + r[k] = v + } + } + + wakuPX.enrCache = r +} + +func (wakuPX *WakuPeerExchange) findPeers() { + if !wakuPX.disc.IsStarted() { + return + } + + ctx, cancel := context.WithTimeout(wakuPX.ctx, 2*time.Second) + defer cancel() + peerRecords, err := wakuPX.disc.FindNodes(ctx, "") + if err != nil { + wakuPX.log.Error("finding peers", zap.Error(err)) + } + + cnt := 0 + wakuPX.enrCacheMutex.Lock() + for _, p := range peerRecords { + cnt++ + wakuPX.enrCache[p.Node.ID()] = peerRecord{ + idx: len(wakuPX.enrCache), + node: p.Node, + } + } + wakuPX.enrCacheMutex.Unlock() + + wakuPX.log.Info("discovered px peers via discv5", zap.Int("count", cnt)) + + wakuPX.cleanCache() +} + +func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop() { + defer wakuPX.wg.Done() + + // Runs a discv5 loop adding new peers to the px peer cache + if wakuPX.disc == nil { + wakuPX.log.Warn("trying to run discovery v5 (for PX) while it's disabled") + return + } + + wakuPX.log.Info("starting peer exchange discovery v5 loop") + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + // This loop "competes" with the loop in wakunode2 + // For the purpose of collecting px peers, 30 sec intervals should be enough + + wakuPX.findPeers() + + for { + select { + case <-wakuPX.quit: + return + + case <-ticker.C: + wakuPX.findPeers() + } + + } +} diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go new file mode 100644 index 00000000..63d14570 --- /dev/null +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -0,0 +1,58 @@ +package peer_exchange + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/status-im/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +type PeerExchangeParameters struct { + host host.Host + selectedPeer peer.ID + log *zap.Logger +} + +type PeerExchangeOption func(*PeerExchangeParameters) + +// WithPeer is an option used to specify the peerID to push a waku message to +func WithPeer(p peer.ID) PeerExchangeOption { + return func(params *PeerExchangeParameters) { + params.selectedPeer = p + } +} + +// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store +// to push a waku message to +func WithAutomaticPeerSelection() PeerExchangeOption { + return func(params *PeerExchangeParameters) { + p, err := utils.SelectPeer(params.host, string(PeerExchangeID_v20alpha1), params.log) + if err == nil { + params.selectedPeer = *p + } else { + params.log.Info("selecting peer", zap.Error(err)) + } + } +} + +// WithFastestPeerSelection is an option used to select a peer from the peer store +// with the lowest ping +func WithFastestPeerSelection(ctx context.Context) PeerExchangeOption { + return func(params *PeerExchangeParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(PeerExchangeID_v20alpha1), params.log) + if err == nil { + params.selectedPeer = *p + } else { + params.log.Info("selecting peer", zap.Error(err)) + } + } +} + +// DefaultOptions are the default options to be used when using the lightpush protocol +func DefaultOptions(host host.Host) []PeerExchangeOption { + return []PeerExchangeOption{ + WithAutomaticPeerSelection(), + } +} diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go new file mode 100644 index 00000000..bf091201 --- /dev/null +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -0,0 +1,162 @@ +package peer_exchange + +import ( + "context" + "crypto/ecdsa" + "fmt" + "math" + "net" + "strconv" + "testing" + "time" + + gcrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/tests" + "github.com/status-im/go-waku/waku/v2/discv5" + "github.com/status-im/go-waku/waku/v2/utils" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" +) + +func createHost(t *testing.T) (host.Host, int, *ecdsa.PrivateKey) { + privKey, err := gcrypto.GenerateKey() + require.NoError(t, err) + + sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey)) + + port, err := tests.FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + + sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + require.NoError(t, err) + + host, err := libp2p.New( + libp2p.ListenAddrs(sourceMultiAddr), + libp2p.Identity(sPrivKey), + ) + require.NoError(t, err) + + return host, port, privKey +} + +func extractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { + ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + return nil, err + } + + portStr, err := addr.ValueForProtocol(multiaddr.P_TCP) + if err != nil { + return nil, err + } + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, err + } + return &net.TCPAddr{ + IP: net.ParseIP(ipStr), + Port: port, + }, nil +} + +func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + localnode := enode.NewLocalNode(db, priv) + localnode.SetFallbackUDP(udpPort) + localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags)) + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + localnode.SetStaticIP(ipAddr.IP) + + if udpPort > 0 && udpPort <= math.MaxUint16 { + localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting udpPort", zap.Int("port", udpPort)) + } + + if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { + localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) + } + + if advertiseAddr != nil { + localnode.SetStaticIP(*advertiseAddr) + } + + return localnode, nil +} + +func TestRetrieveProvidePeerExchangePeers(t *testing.T) { + // H1 + host1, _, prvKey1 := createHost(t) + udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + ip1, _ := extractIP(host1.Addrs()[0]) + l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) + require.NoError(t, err) + d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), discv5.WithUDPPort(udpPort1)) + require.NoError(t, err) + + // H2 + host2, _, prvKey2 := createHost(t) + ip2, _ := extractIP(host2.Addrs()[0]) + udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) + require.NoError(t, err) + d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), discv5.WithUDPPort(udpPort2), discv5.WithBootnodes([]*enode.Node{d1.Node()})) + require.NoError(t, err) + + // H3 + host3, _, _ := createHost(t) + + defer d1.Stop() + defer d2.Stop() + defer host1.Close() + defer host2.Close() + defer host3.Close() + + err = d1.Start() + require.NoError(t, err) + + err = d2.Start() + require.NoError(t, err) + + // mount peer exchange + px1 := NewWakuPeerExchange(context.Background(), host1, d1, utils.Logger()) + px3 := NewWakuPeerExchange(context.Background(), host3, nil, utils.Logger()) + + err = px1.Start() + require.NoError(t, err) + + err = px3.Start() + require.NoError(t, err) + + time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic + + host3.Peerstore().AddAddrs(host1.ID(), host1.Addrs(), peerstore.PermanentAddrTTL) + err = host3.Peerstore().AddProtocols(host1.ID(), string(PeerExchangeID_v20alpha1)) + require.NoError(t, err) + + err = px3.Request(context.Background(), 1) + require.NoError(t, err) + + time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic + + peer2Info := host3.Peerstore().PeerInfo(host2.ID()) + require.Equal(t, host2.Addrs()[0], peer2Info.Addrs[0]) + + px1.Stop() + px3.Stop() +}