feat: discoveryV5 - part2 (#150)

This commit is contained in:
Richard Ramos 2021-11-17 12:19:42 -04:00 committed by GitHub
parent 978bedfafa
commit 817759c235
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 429 additions and 101 deletions

View File

@ -45,7 +45,7 @@ func main() {
wakuNode, err := node.New(ctx,
node.WithPrivateKey(prvKey),
node.WithHostAddress([]*net.TCPAddr{hostAddr}),
node.WithHostAddress(hostAddr),
node.WithWakuRelay(),
)
if err != nil {

View File

@ -79,7 +79,7 @@ func main() {
opts := []node.WakuNodeOption{
node.WithPrivateKey(prvKey),
node.WithHostAddress([]*net.TCPAddr{hostAddr}),
node.WithHostAddress(hostAddr),
node.WithWakuStore(false, false),
node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second),
}

View File

@ -64,7 +64,7 @@ func main() {
fullNode, err := node.New(ctx,
node.WithPrivateKey(prvKey1),
node.WithHostAddress([]*net.TCPAddr{hostAddr1}),
node.WithHostAddress(hostAddr1),
node.WithWakuRelay(),
node.WithWakuFilter(true),
)
@ -76,7 +76,7 @@ func main() {
lightNode, err := node.New(ctx,
node.WithPrivateKey(prvKey2),
node.WithHostAddress([]*net.TCPAddr{hostAddr2}),
node.WithHostAddress(hostAddr2),
node.WithWakuFilter(false),
)
if err != nil {

View File

@ -27,7 +27,7 @@ func TestBasicSendingReceiving(t *testing.T) {
wakuNode, err := node.New(ctx,
node.WithPrivateKey(prvKey),
node.WithHostAddress([]*net.TCPAddr{hostAddr}),
node.WithHostAddress(hostAddr),
node.WithWakuRelay(),
)
require.NoError(t, err)

View File

@ -17,7 +17,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
dssql "github.com/ipfs/go-ds-sql"
manet "github.com/multiformats/go-multiaddr/net"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
@ -53,6 +52,26 @@ func failOnErr(err error, msg string) {
}
}
func freePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
port := l.Addr().(*net.TCPAddr).Port
err = l.Close()
if err != nil {
return 0, err
}
return port, nil
}
// Execute starts a go-waku node with settings determined by the Options parameter
func Execute(options Options) {
if options.GenerateKey {
@ -89,14 +108,25 @@ func Execute(options Options) {
nodeOpts := []node.WakuNodeOption{
node.WithPrivateKey(prvKey),
node.WithHostAddress([]*net.TCPAddr{hostAddr}),
node.WithHostAddress(hostAddr),
node.WithKeepAlive(time.Duration(options.KeepAlive) * time.Second),
}
if options.AdvertiseAddress != "" {
advertiseAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", options.AdvertiseAddress, options.Port))
failOnErr(err, "invalid advertise address")
nodeOpts = append(nodeOpts, node.WithAdvertiseAddress([]*net.TCPAddr{advertiseAddr}, options.EnableWS, options.WSPort))
failOnErr(err, "Invalid advertise address")
if advertiseAddr.Port == 0 {
for {
p, err := freePort()
if err == nil {
advertiseAddr.Port = p
break
}
}
}
nodeOpts = append(nodeOpts, node.WithAdvertiseAddress(advertiseAddr, options.EnableWS, options.WSPort))
}
if options.EnableWS {
@ -110,6 +140,9 @@ func Execute(options Options) {
}
libp2pOpts := node.DefaultLibP2POptions
if options.AdvertiseAddress == "" {
libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) // Attempt to open ports using uPNP for NATed hosts.)
}
if options.UseDB {
// Create persistent peerstore
@ -162,6 +195,18 @@ func Execute(options Options) {
nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second))))
}
if options.DiscV5.Enable {
var bootnodes []*enode.Node
for _, addr := range options.DiscV5.Nodes {
bootnode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
log.Fatal("could not parse enr: ", err)
}
bootnodes = append(bootnodes, bootnode)
}
nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second))))
}
wakuNode, err := node.New(ctx, nodeOpts...)
failOnErr(err, "Wakunode")
@ -171,10 +216,25 @@ func Execute(options Options) {
addPeers(wakuNode, options.LightPush.Nodes, lightpush.LightPushID_v20beta1)
addPeers(wakuNode, options.Filter.Nodes, filter.FilterID_v20beta1)
if options.DNSDiscovery.Enable || options.DiscV5.Enable {
for _, addr := range wakuNode.ListenAddresses() {
ip, _ := addr.ValueForProtocol(multiaddr.P_IP4)
// TODO: use enode.New
enr := enode.NewV4(&prvKey.PublicKey, net.ParseIP(ip), hostAddr.Port, 0)
log.Info("ENR: ", enr)
}
}
if err = wakuNode.Start(); err != nil {
log.Fatal(fmt.Errorf("could not start waku node, %w", err))
}
if options.DiscV5.Enable {
if err = wakuNode.DiscV5().Start(); err != nil {
log.Fatal(fmt.Errorf("could not start discovery v5, %w", err))
}
}
if len(options.Relay.Topics) == 0 {
options.Relay.Topics = []string{string(relay.DefaultWakuTopic)}
}
@ -197,12 +257,6 @@ func Execute(options Options) {
}
if options.DNSDiscovery.Enable {
for _, addr := range wakuNode.ListenAddresses() {
ip, _ := addr.ValueForProtocol(multiaddr.P_IP4)
enr := enode.NewV4(&prvKey.PublicKey, net.ParseIP(ip), hostAddr.Port, 0)
log.Info("ENR: ", enr)
}
if options.DNSDiscovery.URL != "" {
log.Info("attempting DNS discovery with ", options.DNSDiscovery.URL)
multiaddresses, err := dnsdisc.RetrieveNodes(ctx, options.DNSDiscovery.URL, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver))
@ -366,30 +420,16 @@ func printListeningAddresses(ctx context.Context, nodeOpts []node.WakuNodeOption
}
var libp2pOpts []config.Option
libp2pOpts = append(libp2pOpts, params.Identity())
libp2pOpts = append(libp2pOpts,
params.Identity(),
libp2p.ListenAddrs(params.MultiAddresses()...),
)
if options.AdvertiseAddress != "" {
advertiseAddress, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", options.AdvertiseAddress, options.Port))
if err != nil {
panic(err)
}
libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(func([]multiaddr.Multiaddr) []multiaddr.Multiaddr {
addr, _ := manet.FromNetAddr(advertiseAddress)
var result []multiaddr.Multiaddr
result = append(result, addr)
if options.EnableWS {
wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", options.AdvertiseAddress, options.WSPort))
result = append(result, wsMa)
}
return result
}))
addrFactory := params.AddressFactory()
if addrFactory != nil {
libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(addrFactory))
}
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrs(params.MultiAddresses()...))
h, err := libp2p.New(ctx, libp2pOpts...)
if err != nil {
panic(err)

View File

@ -6,11 +6,19 @@ type RendezvousOptions struct {
Enable bool `long:"rendezvous" description:"Enable rendezvous protocol for peer discovery"`
Nodes []string `long:"rendezvous-node" description:"Multiaddr of a waku2 rendezvous node. Option may be repeated"`
}
type RendezvousServerOptions struct {
Enable bool `long:"rendezvous-server" description:"Node will act as rendezvous server"`
DBPath string `long:"rendezvous-db-path" description:"Path where peer records database will be stored" default:"/tmp/rendezvous"`
}
type DiscV5Options struct {
Enable bool `long:"discv5-discovery" description:"Enable discovering nodes via Node Discovery v5"`
Nodes []string `long:"discv5-bootstrap-node" description:"Text-encoded ENR for bootstrap node. Used when connecting to the network. Option may be repeated"`
Port int `long:"discv5-udp-port" description:"Listening UDP port for Node Discovery v5." default:"9000"`
AutoUpdate bool `long:"discv5-enr-auto-update" description:"Discovery can automatically update its ENR with the IP address as seen by other nodes it communicates with." `
}
type RelayOptions struct {
Disable bool `long:"no-relay" description:"Disable relay protocol"`
Topics []string `long:"topics" description:"List of topics to listen"`
@ -76,10 +84,10 @@ type RPCServerOptions struct {
// Options contains all the available features and settings that can be
// configured via flags when executing go-waku as a service.
type Options struct {
Port int `short:"p" long:"port" description:"Libp2p TCP listening port (0 for random)" default:"9000"`
Port int `short:"p" long:"port" description:"Libp2p TCP listening port (0 for random)" default:"60000"`
Address string `long:"address" description:"Listening address" default:"0.0.0.0"`
EnableWS bool `long:"ws" description:"Enable websockets support"`
WSPort int `long:"ws-port" description:"Libp2p TCP listening port for websocket connection (0 for random)" default:"9001"`
WSPort int `long:"ws-port" description:"Libp2p TCP listening port for websocket connection (0 for random)" default:"60001"`
WSAddress string `long:"ws-address" description:"Listening address for websocket connections" default:"0.0.0.0"`
NodeKey string `long:"nodekey" description:"P2P node private key as hex. Can also be set with GOWAKU-NODEKEY env variable (default random)"`
KeyFile string `long:"key-file" description:"Path to a file containing the private key for the P2P node" default:"./nodekey"`
@ -97,6 +105,7 @@ type Options struct {
Store StoreOptions `group:"Store Options"`
Filter FilterOptions `group:"Filter Options"`
LightPush LightpushOptions `group:"LightPush Options"`
DiscV5 DiscV5Options `group:"DiscoveryV5 Options"`
Rendezvous RendezvousOptions `group:"Rendezvous Options"`
RendezvousServer RendezvousServerOptions `group:"Rendezvous Server Options"`
DNSDiscovery DNSDiscoveryOptions `group:"DNS Discovery Options"`

View File

@ -3,6 +3,8 @@ package discv5
import (
"context"
"crypto/ecdsa"
"fmt"
"math"
"math/rand"
"net"
"sync"
@ -21,6 +23,8 @@ import (
var log = logging.Logger("waku_discv5")
type DiscoveryV5 struct {
sync.Mutex
discovery.Discovery
params *discV5Parameters
@ -45,9 +49,11 @@ type peerRecord struct {
}
type discV5Parameters struct {
bootnodes []*enode.Node
advertiseAddress *net.IP
udpPort int
autoUpdate bool
bootnodes []*enode.Node
udpPort int
tcpPort int
advertiseAddr *net.IP
}
const WakuENRField = "waku2"
@ -57,15 +63,21 @@ type WakuEnrBitfield = uint8
type DiscoveryV5Option func(*discV5Parameters)
func WithAutoUpdate(autoUpdate bool) DiscoveryV5Option {
return func(params *discV5Parameters) {
params.autoUpdate = autoUpdate
}
}
func WithBootnodes(bootnodes []*enode.Node) DiscoveryV5Option {
return func(params *discV5Parameters) {
params.bootnodes = bootnodes
}
}
func WithAdvertiseAddress(advertiseAddr net.IP) DiscoveryV5Option {
func WithAdvertiseAddr(addr net.IP) DiscoveryV5Option {
return func(params *discV5Parameters) {
params.advertiseAddress = &advertiseAddr
params.advertiseAddr = &addr
}
}
@ -111,7 +123,9 @@ func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.Priv
opt(params)
}
localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddress)
params.tcpPort = tcpPort
localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddr)
if err != nil {
return nil, err
}
@ -129,7 +143,7 @@ func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.Priv
Bootnodes: params.bootnodes,
},
udpAddr: &net.UDPAddr{
IP: ipAddr,
IP: net.IPv4zero,
Port: params.udpPort,
},
}, nil
@ -140,25 +154,31 @@ func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort in
if err != nil {
return nil, err
}
localnode := enode.NewLocalNode(db, priv)
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
localnode.SetFallbackUDP(udpPort)
localnode.Set(enr.WithEntry(WakuENRField, wakuFlags))
localnode.Set(enr.IP(ipAddr))
localnode.Set(enr.IP(ipAddr)) // Test if IP changes in p2p/enode/localnode.go ?
localnode.Set(enr.UDP(udpPort))
localnode.Set(enr.TCP(tcpPort))
if udpPort > 0 && udpPort <= math.MaxUint16 {
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("could not set udpPort ", udpPort)
}
if tcpPort > 0 && tcpPort <= math.MaxUint16 {
localnode.Set(enr.TCP(uint16(tcpPort))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("could not set tcpPort ", tcpPort)
}
if advertiseAddr != nil {
localnode.SetStaticIP(*advertiseAddr)
}
return localnode, nil
}
func (d *DiscoveryV5) Start() error {
func (d *DiscoveryV5) listen() error {
conn, err := net.ListenUDP("udp", d.udpAddr)
if err != nil {
return err
@ -171,13 +191,80 @@ func (d *DiscoveryV5) Start() error {
d.listener = listener
log.Info("Started Discovery V5 at %s:%d", d.udpAddr.IP, d.udpAddr.Port)
return nil
}
func (d *DiscoveryV5) Start() error {
d.Lock()
defer d.Unlock()
err := d.listen()
if err != nil {
return err
}
log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.params.tcpPort))
log.Info("Discovery V5 ", d.localnode.Node())
return nil
}
func (d *DiscoveryV5) Stop() {
d.Lock()
defer d.Unlock()
d.listener.Close()
d.listener = nil
log.Info("Stopped Discovery V5")
}
// IsPrivate reports whether ip is a private address, according to
// RFC 1918 (IPv4 addresses) and RFC 4193 (IPv6 addresses).
// Copied/Adapted from https://go-review.googlesource.com/c/go/+/272668/11/src/net/ip.go
// Copyright (c) The Go Authors. All rights reserved.
// @TODO: once Go 1.17 is released in Q42021, remove this function as it will become part of the language
func IsPrivate(ip net.IP) bool {
if ip4 := ip.To4(); ip4 != nil {
// Following RFC 4193, Section 3. Local IPv6 Unicast Addresses which says:
// The Internet Assigned Numbers Authority (IANA) has reserved the
// following three blocks of the IPv4 address space for private internets:
// 10.0.0.0 - 10.255.255.255 (10/8 prefix)
// 172.16.0.0 - 172.31.255.255 (172.16/12 prefix)
// 192.168.0.0 - 192.168.255.255 (192.168/16 prefix)
return ip4[0] == 10 ||
(ip4[0] == 172 && ip4[1]&0xf0 == 16) ||
(ip4[0] == 192 && ip4[1] == 168)
}
// Following RFC 4193, Section 3. Private Address Space which says:
// The Internet Assigned Numbers Authority (IANA) has reserved the
// following block of the IPv6 address space for local internets:
// FC00:: - FDFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF (FC00::/7 prefix)
return len(ip) == net.IPv6len && ip[0]&0xfe == 0xfc
}
func (d *DiscoveryV5) UpdateAddr(addr net.IP) error {
if !d.params.autoUpdate {
return nil
}
d.Lock()
defer d.Unlock()
if addr.IsUnspecified() || d.localnode.Node().IP().Equal(addr) {
return nil
}
// TODO: improve this logic to determine if an address should be replaced or not
if !(d.localnode.Node().IP().IsLoopback() && IsPrivate(addr)) && !(IsPrivate(d.localnode.Node().IP()) && !addr.IsLoopback() && !IsPrivate(addr)) {
return nil
}
d.localnode.Set(enr.IP(addr))
log.Info(fmt.Sprintf("Updated Discovery V5 node IP: %s", d.localnode.Node().IP()))
log.Info("Discovery V5 ", d.localnode.Node())
return nil
}
func isWakuNode(node *enode.Node) bool {
@ -218,6 +305,7 @@ func (d *DiscoveryV5) evaluateNode(node *enode.Node) bool {
}
_, err := utils.EnodeToPeerInfo(node)
if err != nil {
log.Error("could not obtain peer info from enode:", err)
return false
@ -239,12 +327,16 @@ func (c *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discover
return 20 * time.Minute, nil
}
func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan struct{}) {
func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int, doneCh chan struct{}) {
for {
if len(d.peerCache.recs) >= limit {
break
}
if ctx.Err() != nil {
break
}
exists := iterator.Next()
if !exists {
break
@ -309,18 +401,22 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco
cacheSize := d.removeExpiredPeers()
// Discover new records if we don't have enough
if cacheSize < limit {
if cacheSize < limit && d.listener != nil {
d.Lock()
iterator := d.listener.RandomNodes()
iterator = enode.Filter(iterator, d.evaluateNode)
defer iterator.Close()
doneCh := make(chan struct{})
go d.iterate(iterator, limit, doneCh)
go d.iterate(ctx, iterator, limit, doneCh)
select {
case <-ctx.Done():
case <-doneCh:
}
d.Unlock()
}
// Randomize and fill channel with available records

View File

@ -43,7 +43,7 @@ func TestConnectionStatusChanges(t *testing.T) {
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
node1, err := New(ctx,
WithHostAddress([]*net.TCPAddr{hostAddr1}),
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithConnectionStatusChannel(connStatusChan),
)
@ -55,7 +55,7 @@ func TestConnectionStatusChanges(t *testing.T) {
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
node2, err := New(ctx,
WithHostAddress([]*net.TCPAddr{hostAddr2}),
WithHostAddress(hostAddr2),
WithWakuRelay(),
)
require.NoError(t, err)
@ -66,7 +66,7 @@ func TestConnectionStatusChanges(t *testing.T) {
hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
node3, err := New(ctx,
WithHostAddress([]*net.TCPAddr{hostAddr3}),
WithHostAddress(hostAddr3),
WithWakuRelay(),
WithWakuStore(false, false),
)

View File

@ -3,6 +3,8 @@ package node
import (
"context"
"fmt"
"net"
"strconv"
"time"
logging "github.com/ipfs/go-log"
@ -21,6 +23,7 @@ import (
rendezvous "github.com/status-im/go-waku-rendezvous"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/discv5"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
@ -50,11 +53,16 @@ type WakuNode struct {
rendezvous *rendezvous.RendezvousService
store *store.WakuStore
addrChan chan ma.Multiaddr
discoveryV5 *discv5.DiscoveryV5
bcaster v2.Broadcaster
connectionNotif ConnectionNotifier
protocolEventSub event.Subscription
identificationEventSub event.Subscription
addressChangesSub event.Subscription
ctx context.Context
cancel context.CancelFunc
@ -72,6 +80,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
params.libP2POpts = DefaultLibP2POptions
opts = append(DefaultWakuNodeOptions, opts...)
for _, opt := range opts {
err := opt(params)
if err != nil {
@ -80,6 +89,14 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
}
}
// Setting default host address if none was provided
if params.hostAddr == nil {
err := WithHostAddress(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})(params)
if err != nil {
cancel()
return nil, err
}
}
if len(params.multiAddr) > 0 {
params.libP2POpts = append(params.libP2POpts, libp2p.ListenAddrs(params.multiAddr...))
}
@ -105,6 +122,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.ctx = ctx
w.opts = params
w.quit = make(chan struct{})
w.addrChan = make(chan ma.Multiaddr, 1024)
if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil {
return nil, err
@ -114,6 +132,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
return nil, err
}
if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil {
return nil, err
}
if params.connStatusC != nil {
w.connStatusChan = params.connStatusC
}
@ -127,13 +149,69 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.startKeepAlive(w.opts.keepAliveInterval)
}
for _, addr := range w.ListenAddresses() {
log.Info("Listening on ", addr)
}
go w.checkForAddressChanges()
go w.onAddrChange()
return w, nil
}
func (w *WakuNode) onAddrChange() {
for m := range w.addrChan {
ipStr, err := m.ValueForProtocol(ma.P_IP4)
if err != nil {
log.Error(fmt.Sprintf("could not extract ip from ma %s: %s", m, err.Error()))
continue
}
ip := net.ParseIP(ipStr)
if !ip.IsLoopback() && !ip.IsUnspecified() {
if w.opts.enableDiscV5 {
err := w.discoveryV5.UpdateAddr(ip)
if err != nil {
log.Error(fmt.Sprintf("could not update DiscV5 address with IP %s: %s", ip, err.Error()))
continue
}
}
}
}
}
func (w *WakuNode) checkForAddressChanges() {
addrs := w.ListenAddresses()
first := make(chan struct{}, 1)
first <- struct{}{}
for {
select {
case <-w.quit:
return
case <-first:
for _, addr := range addrs {
log.Info("Listening on ", addr)
}
case <-w.addressChangesSub.Out():
newAddrs := w.ListenAddresses()
print := false
if len(addrs) != len(newAddrs) {
print = true
} else {
for i := range newAddrs {
if addrs[i].String() != newAddrs[i].String() {
print = true
break
}
}
}
if print {
addrs = newAddrs
log.Warn("Change in host multiaddresses")
for _, addr := range newAddrs {
w.addrChan <- addr
log.Warn("Listening on ", addr)
}
}
}
}
}
func (w *WakuNode) Start() error {
w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
if w.opts.enableStore {
@ -149,6 +227,17 @@ func (w *WakuNode) Start() error {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...))
}
if w.opts.enableDiscV5 {
err := w.mountDiscV5()
if err != nil {
return err
}
}
if w.opts.enableDiscV5 {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...))
}
err := w.mountRelay(w.opts.wOpts...)
if err != nil {
return err
@ -186,12 +275,14 @@ func (w *WakuNode) Stop() {
defer w.cancel()
close(w.quit)
close(w.addrChan)
w.bcaster.Close()
defer w.connectionNotif.Close()
defer w.protocolEventSub.Close()
defer w.identificationEventSub.Close()
defer w.addressChangesSub.Close()
if w.rendezvous != nil {
w.rendezvous.Stop()
@ -241,6 +332,10 @@ func (w *WakuNode) Lightpush() *lightpush.WakuLightPush {
return w.lightPush
}
func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 {
return w.discoveryV5
}
func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, opts...)
@ -260,6 +355,41 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
return err
}
func (w *WakuNode) mountDiscV5() error {
wakuFlag := discv5.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay)
discV5Options := []discv5.DiscoveryV5Option{
discv5.WithBootnodes(w.opts.discV5bootnodes),
discv5.WithUDPPort(w.opts.udpPort),
discv5.WithAutoUpdate(w.opts.discV5autoUpdate),
}
addr := w.ListenAddresses()[0]
ipStr, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
return err
}
portStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil {
return err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return err
}
discoveryV5, err := discv5.NewDiscoveryV5(w.Host(), net.ParseIP(ipStr), port, w.opts.privKey, wakuFlag, discV5Options...)
if err != nil {
return err
}
w.discoveryV5 = discoveryV5
return nil
}
func (w *WakuNode) mountRendezvous() error {
w.rendezvous = rendezvous.NewRendezvousService(w.host, w.opts.rendevousStorage)
@ -463,9 +593,9 @@ func pingPeer(ctx context.Context, host host.Host, peer peer.ID) {
select {
case res := <-pr:
if res.Error != nil {
log.Error(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error()))
log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error()))
}
case <-ctx.Done():
log.Error(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err()))
log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err()))
}
}

View File

@ -22,7 +22,7 @@ func TestWakuNode2(t *testing.T) {
wakuNode, err := New(ctx,
WithPrivateKey(prvKey),
WithHostAddress([]*net.TCPAddr{hostAddr}),
WithHostAddress(hostAddr),
WithWakuRelay(),
)
require.NoError(t, err)

View File

@ -6,6 +6,7 @@ import (
"net"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
@ -23,9 +24,11 @@ import (
const clientId string = "Go Waku v2 node"
type WakuNodeParameters struct {
hostAddr *net.TCPAddr
advertiseAddr *net.IP
multiAddr []ma.Multiaddr
addressFactory basichost.AddrsFactory
privKey *crypto.PrivKey
privKey *ecdsa.PrivateKey
libP2POpts []libp2p.Option
enableRelay bool
@ -45,6 +48,12 @@ type WakuNodeParameters struct {
rendevousStorage rendezvous.Storage
rendezvousOpts []pubsub.DiscoverOpt
enableDiscV5 bool
udpPort int
discV5bootnodes []*enode.Node
discV5Opts []pubsub.DiscoverOpt
discV5autoUpdate bool
keepAliveInterval time.Duration
enableLightPush bool
@ -54,6 +63,11 @@ type WakuNodeParameters struct {
type WakuNodeOption func(*WakuNodeParameters) error
// Default options used in the libp2p node
var DefaultWakuNodeOptions = []WakuNodeOption{
WithWakuRelay(),
}
// MultiAddresses return the list of multiaddresses configured in the node
func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr {
return w.multiAddr
@ -61,39 +75,43 @@ func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr {
// Identity returns a libp2p option containing the identity used by the node
func (w WakuNodeParameters) Identity() config.Option {
return libp2p.Identity(*w.privKey)
return libp2p.Identity(*w.GetPrivKey())
}
// WithHostAddress is a WakuNodeOption that configures libp2p to listen on a list of net endpoint addresses
func WithHostAddress(hostAddr []*net.TCPAddr) WakuNodeOption {
return func(params *WakuNodeParameters) error {
var multiAddresses []ma.Multiaddr
for _, addr := range hostAddr {
hostAddrMA, err := manet.FromNetAddr(addr)
if err != nil {
return err
}
multiAddresses = append(multiAddresses, hostAddrMA)
}
func (w WakuNodeParameters) AddressFactory() basichost.AddrsFactory {
return w.addressFactory
}
params.multiAddr = append(params.multiAddr, multiAddresses...)
// WithHostAddress is a WakuNodeOption that configures libp2p to listen on a specific address
func WithHostAddress(hostAddr *net.TCPAddr) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.hostAddr = hostAddr
hostAddrMA, err := manet.FromNetAddr(hostAddr)
if err != nil {
return err
}
params.multiAddr = append(params.multiAddr, hostAddrMA)
return nil
}
}
// WithAdvertiseAddress is a WakuNodeOption that allows overriding the addresses used in the waku node with custom values
func WithAdvertiseAddress(addressesToAdvertise []*net.TCPAddr, enableWS bool, wsPort int) WakuNodeOption {
// WithAdvertiseAddress is a WakuNodeOption that allows overriding the address used in the waku node with custom value
func WithAdvertiseAddress(address *net.TCPAddr, enableWS bool, wsPort int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.advertiseAddr = &address.IP
advertiseAddress, err := manet.FromNetAddr(address)
if err != nil {
return err
}
params.addressFactory = func([]ma.Multiaddr) []ma.Multiaddr {
var result []multiaddr.Multiaddr
for _, adv := range addressesToAdvertise {
addr, _ := manet.FromNetAddr(adv)
result = append(result, addr)
if enableWS {
wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", adv.IP.String(), wsPort))
result = append(result, wsMa)
}
result = append(result, advertiseAddress)
if enableWS {
wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", address, wsPort))
result = append(result, wsMa)
}
return result
}
@ -112,12 +130,16 @@ func WithMultiaddress(addresses []ma.Multiaddr) WakuNodeOption {
// WithPrivateKey is used to set an ECDSA private key in a libp2p node
func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption {
return func(params *WakuNodeParameters) error {
privk := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey))
params.privKey = &privk
params.privKey = privKey
return nil
}
}
func (w *WakuNodeParameters) GetPrivKey() *crypto.PrivKey {
privKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(w.privKey))
return &privKey
}
// WithLibP2POptions is a WakuNodeOption used to configure the libp2p node.
// This can potentially override any libp2p config that was set with other
// WakuNodeOption
@ -138,6 +160,18 @@ func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {
}
}
// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery
func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableDiscV5 = true
params.udpPort = udpPort
params.discV5bootnodes = bootnodes
params.discV5Opts = discoverOpts
params.discV5autoUpdate = autoUpdate
return nil
}
}
// WithRendezvous is a WakuOption used to enable go-waku-rendezvous discovery.
// It accepts an optional list of DiscoveryOpt options
func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
@ -232,7 +266,6 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption {
var DefaultLibP2POptions = []libp2p.Option{
libp2p.DefaultTransports,
libp2p.UserAgent(clientId),
libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts.
libp2p.EnableNATService(), // TODO: is this needed?)
libp2p.ConnectionManager(connmgr.NewConnManager(200, 300, 0)),
}

View File

@ -26,12 +26,11 @@ func TestWakuOptions(t *testing.T) {
addr, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4000/ws")
require.NoError(t, err)
advertiseAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:4000")
require.NoError(t, err)
advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
options := []WakuNodeOption{
WithHostAddress([]*net.TCPAddr{hostAddr}),
WithAdvertiseAddress([]*net.TCPAddr{advertiseAddr}, false, 4000),
WithHostAddress(hostAddr),
WithAdvertiseAddress(advertiseAddr, false, 4000),
WithMultiaddress([]multiaddr.Multiaddr{addr}),
WithPrivateKey(prvKey),
WithLibP2POptions(),
@ -39,6 +38,7 @@ func TestWakuOptions(t *testing.T) {
WithRendezvous(),
WithRendezvousServer(rendezvous.NewStorage(nil)),
WithWakuFilter(true),
WithDiscoveryV5(123, nil, false),
WithWakuStore(true, true),
WithWakuStoreAndRetentionPolicy(true, time.Hour, 100),
WithMessageProvider(nil),

View File

@ -6,6 +6,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store"
)
type AdminService struct {
@ -52,6 +56,10 @@ func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *Su
return nil
}
func isWakuProtocol(protocol string) bool {
return protocol == string(filter.FilterID_v20beta1) || protocol == string(relay.WakuRelayID_v200) || protocol == string(lightpush.LightPushID_v20beta1) || protocol == string(store.StoreID_v20beta3)
}
func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *PeersReply) error {
peers, err := a.node.Peers()
if err != nil {
@ -59,12 +67,17 @@ func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *
return nil
}
for _, peer := range peers {
for idx, addr := range peer.Addrs {
reply.Peers = append(reply.Peers, PeerReply{
Multiaddr: addr.String(),
Protocol: peer.Protocols[idx],
Connected: peer.Connected,
})
for _, addr := range peer.Addrs {
for _, proto := range peer.Protocols {
if !isWakuProtocol(proto) {
continue
}
reply.Peers = append(reply.Peers, PeerReply{
Multiaddr: addr.String(),
Protocol: proto,
Connected: peer.Connected,
})
}
}
}
return nil

View File

@ -7,11 +7,13 @@ import (
"fmt"
"net/http"
"testing"
"time"
"github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/stretchr/testify/require"
)
@ -30,6 +32,9 @@ func TestV1Peers(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, nil)
require.NoError(t, err)
defer relay.Stop()
var reply PeersReply
@ -56,6 +61,8 @@ func TestV1Peers(t *testing.T) {
require.NoError(t, err)
require.True(t, reply2.Success)
time.Sleep(2 * time.Second)
err = a.GetV1Peers(request, &GetPeersArgs{}, &reply)
require.NoError(t, err)
require.Len(t, reply.Peers, 2)