From 817759c235621206443e91fe0b96dcaaae93aac5 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 17 Nov 2021 12:19:42 -0400 Subject: [PATCH] feat: discoveryV5 - part2 (#150) --- examples/basic2/main.go | 2 +- examples/chat2/main.go | 2 +- examples/filter2/main.go | 4 +- tests/connection_test.go | 2 +- waku/node.go | 102 ++++++++++++++------- waku/options.go | 13 ++- waku/v2/discv5/discover.go | 132 +++++++++++++++++++++++---- waku/v2/node/connectedness_test.go | 6 +- waku/v2/node/wakunode2.go | 140 +++++++++++++++++++++++++++-- waku/v2/node/wakunode2_test.go | 2 +- waku/v2/node/wakuoptions.go | 85 ++++++++++++------ waku/v2/node/wakuoptions_test.go | 8 +- waku/v2/rpc/admin.go | 25 ++++-- waku/v2/rpc/admin_test.go | 7 ++ 14 files changed, 429 insertions(+), 101 deletions(-) diff --git a/examples/basic2/main.go b/examples/basic2/main.go index 96cdb229..59af31d1 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -45,7 +45,7 @@ func main() { wakuNode, err := node.New(ctx, node.WithPrivateKey(prvKey), - node.WithHostAddress([]*net.TCPAddr{hostAddr}), + node.WithHostAddress(hostAddr), node.WithWakuRelay(), ) if err != nil { diff --git a/examples/chat2/main.go b/examples/chat2/main.go index 91036a1b..1ca52795 100644 --- a/examples/chat2/main.go +++ b/examples/chat2/main.go @@ -79,7 +79,7 @@ func main() { opts := []node.WakuNodeOption{ node.WithPrivateKey(prvKey), - node.WithHostAddress([]*net.TCPAddr{hostAddr}), + node.WithHostAddress(hostAddr), node.WithWakuStore(false, false), node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second), } diff --git a/examples/filter2/main.go b/examples/filter2/main.go index aaf9fd23..da4c717f 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -64,7 +64,7 @@ func main() { fullNode, err := node.New(ctx, node.WithPrivateKey(prvKey1), - node.WithHostAddress([]*net.TCPAddr{hostAddr1}), + node.WithHostAddress(hostAddr1), node.WithWakuRelay(), node.WithWakuFilter(true), ) @@ -76,7 +76,7 @@ func main() { lightNode, err := node.New(ctx, node.WithPrivateKey(prvKey2), - node.WithHostAddress([]*net.TCPAddr{hostAddr2}), + node.WithHostAddress(hostAddr2), node.WithWakuFilter(false), ) if err != nil { diff --git a/tests/connection_test.go b/tests/connection_test.go index 003535c4..edcf0374 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -27,7 +27,7 @@ func TestBasicSendingReceiving(t *testing.T) { wakuNode, err := node.New(ctx, node.WithPrivateKey(prvKey), - node.WithHostAddress([]*net.TCPAddr{hostAddr}), + node.WithHostAddress(hostAddr), node.WithWakuRelay(), ) require.NoError(t, err) diff --git a/waku/node.go b/waku/node.go index 9ddfb2bb..b6c26a6e 100644 --- a/waku/node.go +++ b/waku/node.go @@ -17,7 +17,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" dssql "github.com/ipfs/go-ds-sql" - manet "github.com/multiformats/go-multiaddr/net" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" @@ -53,6 +52,26 @@ func failOnErr(err error, msg string) { } } +func freePort() (int, error) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return 0, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return 0, err + } + + port := l.Addr().(*net.TCPAddr).Port + err = l.Close() + if err != nil { + return 0, err + } + + return port, nil +} + // Execute starts a go-waku node with settings determined by the Options parameter func Execute(options Options) { if options.GenerateKey { @@ -89,14 +108,25 @@ func Execute(options Options) { nodeOpts := []node.WakuNodeOption{ node.WithPrivateKey(prvKey), - node.WithHostAddress([]*net.TCPAddr{hostAddr}), + node.WithHostAddress(hostAddr), node.WithKeepAlive(time.Duration(options.KeepAlive) * time.Second), } if options.AdvertiseAddress != "" { advertiseAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", options.AdvertiseAddress, options.Port)) - failOnErr(err, "invalid advertise address") - nodeOpts = append(nodeOpts, node.WithAdvertiseAddress([]*net.TCPAddr{advertiseAddr}, options.EnableWS, options.WSPort)) + failOnErr(err, "Invalid advertise address") + + if advertiseAddr.Port == 0 { + for { + p, err := freePort() + if err == nil { + advertiseAddr.Port = p + break + } + } + } + + nodeOpts = append(nodeOpts, node.WithAdvertiseAddress(advertiseAddr, options.EnableWS, options.WSPort)) } if options.EnableWS { @@ -110,6 +140,9 @@ func Execute(options Options) { } libp2pOpts := node.DefaultLibP2POptions + if options.AdvertiseAddress == "" { + libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) // Attempt to open ports using uPNP for NATed hosts.) + } if options.UseDB { // Create persistent peerstore @@ -162,6 +195,18 @@ func Execute(options Options) { nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second)))) } + if options.DiscV5.Enable { + var bootnodes []*enode.Node + for _, addr := range options.DiscV5.Nodes { + bootnode, err := enode.Parse(enode.ValidSchemes, addr) + if err != nil { + log.Fatal("could not parse enr: ", err) + } + bootnodes = append(bootnodes, bootnode) + } + nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second)))) + } + wakuNode, err := node.New(ctx, nodeOpts...) failOnErr(err, "Wakunode") @@ -171,10 +216,25 @@ func Execute(options Options) { addPeers(wakuNode, options.LightPush.Nodes, lightpush.LightPushID_v20beta1) addPeers(wakuNode, options.Filter.Nodes, filter.FilterID_v20beta1) + if options.DNSDiscovery.Enable || options.DiscV5.Enable { + for _, addr := range wakuNode.ListenAddresses() { + ip, _ := addr.ValueForProtocol(multiaddr.P_IP4) + // TODO: use enode.New + enr := enode.NewV4(&prvKey.PublicKey, net.ParseIP(ip), hostAddr.Port, 0) + log.Info("ENR: ", enr) + } + } + if err = wakuNode.Start(); err != nil { log.Fatal(fmt.Errorf("could not start waku node, %w", err)) } + if options.DiscV5.Enable { + if err = wakuNode.DiscV5().Start(); err != nil { + log.Fatal(fmt.Errorf("could not start discovery v5, %w", err)) + } + } + if len(options.Relay.Topics) == 0 { options.Relay.Topics = []string{string(relay.DefaultWakuTopic)} } @@ -197,12 +257,6 @@ func Execute(options Options) { } if options.DNSDiscovery.Enable { - for _, addr := range wakuNode.ListenAddresses() { - ip, _ := addr.ValueForProtocol(multiaddr.P_IP4) - enr := enode.NewV4(&prvKey.PublicKey, net.ParseIP(ip), hostAddr.Port, 0) - log.Info("ENR: ", enr) - } - if options.DNSDiscovery.URL != "" { log.Info("attempting DNS discovery with ", options.DNSDiscovery.URL) multiaddresses, err := dnsdisc.RetrieveNodes(ctx, options.DNSDiscovery.URL, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver)) @@ -366,30 +420,16 @@ func printListeningAddresses(ctx context.Context, nodeOpts []node.WakuNodeOption } var libp2pOpts []config.Option - libp2pOpts = append(libp2pOpts, params.Identity()) + libp2pOpts = append(libp2pOpts, + params.Identity(), + libp2p.ListenAddrs(params.MultiAddresses()...), + ) - if options.AdvertiseAddress != "" { - advertiseAddress, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", options.AdvertiseAddress, options.Port)) - if err != nil { - panic(err) - } - - libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(func([]multiaddr.Multiaddr) []multiaddr.Multiaddr { - addr, _ := manet.FromNetAddr(advertiseAddress) - var result []multiaddr.Multiaddr - result = append(result, addr) - - if options.EnableWS { - wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", options.AdvertiseAddress, options.WSPort)) - result = append(result, wsMa) - } - - return result - })) + addrFactory := params.AddressFactory() + if addrFactory != nil { + libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(addrFactory)) } - libp2pOpts = append(libp2pOpts, libp2p.ListenAddrs(params.MultiAddresses()...)) - h, err := libp2p.New(ctx, libp2pOpts...) if err != nil { panic(err) diff --git a/waku/options.go b/waku/options.go index ae36d0f2..45811c28 100644 --- a/waku/options.go +++ b/waku/options.go @@ -6,11 +6,19 @@ type RendezvousOptions struct { Enable bool `long:"rendezvous" description:"Enable rendezvous protocol for peer discovery"` Nodes []string `long:"rendezvous-node" description:"Multiaddr of a waku2 rendezvous node. Option may be repeated"` } + type RendezvousServerOptions struct { Enable bool `long:"rendezvous-server" description:"Node will act as rendezvous server"` DBPath string `long:"rendezvous-db-path" description:"Path where peer records database will be stored" default:"/tmp/rendezvous"` } +type DiscV5Options struct { + Enable bool `long:"discv5-discovery" description:"Enable discovering nodes via Node Discovery v5"` + Nodes []string `long:"discv5-bootstrap-node" description:"Text-encoded ENR for bootstrap node. Used when connecting to the network. Option may be repeated"` + Port int `long:"discv5-udp-port" description:"Listening UDP port for Node Discovery v5." default:"9000"` + AutoUpdate bool `long:"discv5-enr-auto-update" description:"Discovery can automatically update its ENR with the IP address as seen by other nodes it communicates with." ` +} + type RelayOptions struct { Disable bool `long:"no-relay" description:"Disable relay protocol"` Topics []string `long:"topics" description:"List of topics to listen"` @@ -76,10 +84,10 @@ type RPCServerOptions struct { // Options contains all the available features and settings that can be // configured via flags when executing go-waku as a service. type Options struct { - Port int `short:"p" long:"port" description:"Libp2p TCP listening port (0 for random)" default:"9000"` + Port int `short:"p" long:"port" description:"Libp2p TCP listening port (0 for random)" default:"60000"` Address string `long:"address" description:"Listening address" default:"0.0.0.0"` EnableWS bool `long:"ws" description:"Enable websockets support"` - WSPort int `long:"ws-port" description:"Libp2p TCP listening port for websocket connection (0 for random)" default:"9001"` + WSPort int `long:"ws-port" description:"Libp2p TCP listening port for websocket connection (0 for random)" default:"60001"` WSAddress string `long:"ws-address" description:"Listening address for websocket connections" default:"0.0.0.0"` NodeKey string `long:"nodekey" description:"P2P node private key as hex. Can also be set with GOWAKU-NODEKEY env variable (default random)"` KeyFile string `long:"key-file" description:"Path to a file containing the private key for the P2P node" default:"./nodekey"` @@ -97,6 +105,7 @@ type Options struct { Store StoreOptions `group:"Store Options"` Filter FilterOptions `group:"Filter Options"` LightPush LightpushOptions `group:"LightPush Options"` + DiscV5 DiscV5Options `group:"DiscoveryV5 Options"` Rendezvous RendezvousOptions `group:"Rendezvous Options"` RendezvousServer RendezvousServerOptions `group:"Rendezvous Server Options"` DNSDiscovery DNSDiscoveryOptions `group:"DNS Discovery Options"` diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 27c86e89..4faacbc2 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -3,6 +3,8 @@ package discv5 import ( "context" "crypto/ecdsa" + "fmt" + "math" "math/rand" "net" "sync" @@ -21,6 +23,8 @@ import ( var log = logging.Logger("waku_discv5") type DiscoveryV5 struct { + sync.Mutex + discovery.Discovery params *discV5Parameters @@ -45,9 +49,11 @@ type peerRecord struct { } type discV5Parameters struct { - bootnodes []*enode.Node - advertiseAddress *net.IP - udpPort int + autoUpdate bool + bootnodes []*enode.Node + udpPort int + tcpPort int + advertiseAddr *net.IP } const WakuENRField = "waku2" @@ -57,15 +63,21 @@ type WakuEnrBitfield = uint8 type DiscoveryV5Option func(*discV5Parameters) +func WithAutoUpdate(autoUpdate bool) DiscoveryV5Option { + return func(params *discV5Parameters) { + params.autoUpdate = autoUpdate + } +} + func WithBootnodes(bootnodes []*enode.Node) DiscoveryV5Option { return func(params *discV5Parameters) { params.bootnodes = bootnodes } } -func WithAdvertiseAddress(advertiseAddr net.IP) DiscoveryV5Option { +func WithAdvertiseAddr(addr net.IP) DiscoveryV5Option { return func(params *discV5Parameters) { - params.advertiseAddress = &advertiseAddr + params.advertiseAddr = &addr } } @@ -111,7 +123,9 @@ func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.Priv opt(params) } - localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddress) + params.tcpPort = tcpPort + + localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddr) if err != nil { return nil, err } @@ -129,7 +143,7 @@ func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.Priv Bootnodes: params.bootnodes, }, udpAddr: &net.UDPAddr{ - IP: ipAddr, + IP: net.IPv4zero, Port: params.udpPort, }, }, nil @@ -140,25 +154,31 @@ func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort in if err != nil { return nil, err } - localnode := enode.NewLocalNode(db, priv) localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) localnode.SetFallbackUDP(udpPort) - localnode.Set(enr.WithEntry(WakuENRField, wakuFlags)) + localnode.Set(enr.IP(ipAddr)) - localnode.Set(enr.IP(ipAddr)) // Test if IP changes in p2p/enode/localnode.go ? - localnode.Set(enr.UDP(udpPort)) - localnode.Set(enr.TCP(tcpPort)) + if udpPort > 0 && udpPort <= math.MaxUint16 { + localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("could not set udpPort ", udpPort) + } + + if tcpPort > 0 && tcpPort <= math.MaxUint16 { + localnode.Set(enr.TCP(uint16(tcpPort))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("could not set tcpPort ", tcpPort) + } if advertiseAddr != nil { localnode.SetStaticIP(*advertiseAddr) } - return localnode, nil } -func (d *DiscoveryV5) Start() error { +func (d *DiscoveryV5) listen() error { conn, err := net.ListenUDP("udp", d.udpAddr) if err != nil { return err @@ -171,13 +191,80 @@ func (d *DiscoveryV5) Start() error { d.listener = listener - log.Info("Started Discovery V5 at %s:%d", d.udpAddr.IP, d.udpAddr.Port) + return nil +} + +func (d *DiscoveryV5) Start() error { + d.Lock() + defer d.Unlock() + + err := d.listen() + if err != nil { + return err + } + + log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.params.tcpPort)) + log.Info("Discovery V5 ", d.localnode.Node()) return nil } func (d *DiscoveryV5) Stop() { + d.Lock() + defer d.Unlock() + d.listener.Close() + d.listener = nil + log.Info("Stopped Discovery V5") +} + +// IsPrivate reports whether ip is a private address, according to +// RFC 1918 (IPv4 addresses) and RFC 4193 (IPv6 addresses). +// Copied/Adapted from https://go-review.googlesource.com/c/go/+/272668/11/src/net/ip.go +// Copyright (c) The Go Authors. All rights reserved. +// @TODO: once Go 1.17 is released in Q42021, remove this function as it will become part of the language +func IsPrivate(ip net.IP) bool { + if ip4 := ip.To4(); ip4 != nil { + // Following RFC 4193, Section 3. Local IPv6 Unicast Addresses which says: + // The Internet Assigned Numbers Authority (IANA) has reserved the + // following three blocks of the IPv4 address space for private internets: + // 10.0.0.0 - 10.255.255.255 (10/8 prefix) + // 172.16.0.0 - 172.31.255.255 (172.16/12 prefix) + // 192.168.0.0 - 192.168.255.255 (192.168/16 prefix) + return ip4[0] == 10 || + (ip4[0] == 172 && ip4[1]&0xf0 == 16) || + (ip4[0] == 192 && ip4[1] == 168) + } + // Following RFC 4193, Section 3. Private Address Space which says: + // The Internet Assigned Numbers Authority (IANA) has reserved the + // following block of the IPv6 address space for local internets: + // FC00:: - FDFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF (FC00::/7 prefix) + return len(ip) == net.IPv6len && ip[0]&0xfe == 0xfc +} + +func (d *DiscoveryV5) UpdateAddr(addr net.IP) error { + if !d.params.autoUpdate { + return nil + } + + d.Lock() + defer d.Unlock() + + if addr.IsUnspecified() || d.localnode.Node().IP().Equal(addr) { + return nil + } + + // TODO: improve this logic to determine if an address should be replaced or not + if !(d.localnode.Node().IP().IsLoopback() && IsPrivate(addr)) && !(IsPrivate(d.localnode.Node().IP()) && !addr.IsLoopback() && !IsPrivate(addr)) { + return nil + } + + d.localnode.Set(enr.IP(addr)) + + log.Info(fmt.Sprintf("Updated Discovery V5 node IP: %s", d.localnode.Node().IP())) + log.Info("Discovery V5 ", d.localnode.Node()) + + return nil } func isWakuNode(node *enode.Node) bool { @@ -218,6 +305,7 @@ func (d *DiscoveryV5) evaluateNode(node *enode.Node) bool { } _, err := utils.EnodeToPeerInfo(node) + if err != nil { log.Error("could not obtain peer info from enode:", err) return false @@ -239,12 +327,16 @@ func (c *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discover return 20 * time.Minute, nil } -func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan struct{}) { +func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int, doneCh chan struct{}) { for { if len(d.peerCache.recs) >= limit { break } + if ctx.Err() != nil { + break + } + exists := iterator.Next() if !exists { break @@ -309,18 +401,22 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco cacheSize := d.removeExpiredPeers() // Discover new records if we don't have enough - if cacheSize < limit { + if cacheSize < limit && d.listener != nil { + d.Lock() + iterator := d.listener.RandomNodes() iterator = enode.Filter(iterator, d.evaluateNode) defer iterator.Close() doneCh := make(chan struct{}) - go d.iterate(iterator, limit, doneCh) + go d.iterate(ctx, iterator, limit, doneCh) select { case <-ctx.Done(): case <-doneCh: } + + d.Unlock() } // Randomize and fill channel with available records diff --git a/waku/v2/node/connectedness_test.go b/waku/v2/node/connectedness_test.go index be26ca6f..5054af39 100644 --- a/waku/v2/node/connectedness_test.go +++ b/waku/v2/node/connectedness_test.go @@ -43,7 +43,7 @@ func TestConnectionStatusChanges(t *testing.T) { hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") require.NoError(t, err) node1, err := New(ctx, - WithHostAddress([]*net.TCPAddr{hostAddr1}), + WithHostAddress(hostAddr1), WithWakuRelay(), WithConnectionStatusChannel(connStatusChan), ) @@ -55,7 +55,7 @@ func TestConnectionStatusChanges(t *testing.T) { hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") require.NoError(t, err) node2, err := New(ctx, - WithHostAddress([]*net.TCPAddr{hostAddr2}), + WithHostAddress(hostAddr2), WithWakuRelay(), ) require.NoError(t, err) @@ -66,7 +66,7 @@ func TestConnectionStatusChanges(t *testing.T) { hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") require.NoError(t, err) node3, err := New(ctx, - WithHostAddress([]*net.TCPAddr{hostAddr3}), + WithHostAddress(hostAddr3), WithWakuRelay(), WithWakuStore(false, false), ) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 81cd882c..5cb73b48 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -3,6 +3,8 @@ package node import ( "context" "fmt" + "net" + "strconv" "time" logging "github.com/ipfs/go-log" @@ -21,6 +23,7 @@ import ( rendezvous "github.com/status-im/go-waku-rendezvous" v2 "github.com/status-im/go-waku/waku/v2" + "github.com/status-im/go-waku/waku/v2/discv5" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" @@ -50,11 +53,16 @@ type WakuNode struct { rendezvous *rendezvous.RendezvousService store *store.WakuStore + addrChan chan ma.Multiaddr + + discoveryV5 *discv5.DiscoveryV5 + bcaster v2.Broadcaster connectionNotif ConnectionNotifier protocolEventSub event.Subscription identificationEventSub event.Subscription + addressChangesSub event.Subscription ctx context.Context cancel context.CancelFunc @@ -72,6 +80,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { params.libP2POpts = DefaultLibP2POptions + opts = append(DefaultWakuNodeOptions, opts...) for _, opt := range opts { err := opt(params) if err != nil { @@ -80,6 +89,14 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } } + // Setting default host address if none was provided + if params.hostAddr == nil { + err := WithHostAddress(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})(params) + if err != nil { + cancel() + return nil, err + } + } if len(params.multiAddr) > 0 { params.libP2POpts = append(params.libP2POpts, libp2p.ListenAddrs(params.multiAddr...)) } @@ -105,6 +122,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.ctx = ctx w.opts = params w.quit = make(chan struct{}) + w.addrChan = make(chan ma.Multiaddr, 1024) if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil { return nil, err @@ -114,6 +132,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { return nil, err } + if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil { + return nil, err + } + if params.connStatusC != nil { w.connStatusChan = params.connStatusC } @@ -127,13 +149,69 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.startKeepAlive(w.opts.keepAliveInterval) } - for _, addr := range w.ListenAddresses() { - log.Info("Listening on ", addr) - } + go w.checkForAddressChanges() + go w.onAddrChange() return w, nil } +func (w *WakuNode) onAddrChange() { + for m := range w.addrChan { + ipStr, err := m.ValueForProtocol(ma.P_IP4) + if err != nil { + log.Error(fmt.Sprintf("could not extract ip from ma %s: %s", m, err.Error())) + continue + } + ip := net.ParseIP(ipStr) + if !ip.IsLoopback() && !ip.IsUnspecified() { + if w.opts.enableDiscV5 { + err := w.discoveryV5.UpdateAddr(ip) + if err != nil { + log.Error(fmt.Sprintf("could not update DiscV5 address with IP %s: %s", ip, err.Error())) + continue + } + } + } + } +} + +func (w *WakuNode) checkForAddressChanges() { + addrs := w.ListenAddresses() + first := make(chan struct{}, 1) + first <- struct{}{} + for { + select { + case <-w.quit: + return + case <-first: + for _, addr := range addrs { + log.Info("Listening on ", addr) + } + case <-w.addressChangesSub.Out(): + newAddrs := w.ListenAddresses() + print := false + if len(addrs) != len(newAddrs) { + print = true + } else { + for i := range newAddrs { + if addrs[i].String() != newAddrs[i].String() { + print = true + break + } + } + } + if print { + addrs = newAddrs + log.Warn("Change in host multiaddresses") + for _, addr := range newAddrs { + w.addrChan <- addr + log.Warn("Listening on ", addr) + } + } + } + } +} + func (w *WakuNode) Start() error { w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) if w.opts.enableStore { @@ -149,6 +227,17 @@ func (w *WakuNode) Start() error { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...)) } + if w.opts.enableDiscV5 { + err := w.mountDiscV5() + if err != nil { + return err + } + } + + if w.opts.enableDiscV5 { + w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...)) + } + err := w.mountRelay(w.opts.wOpts...) if err != nil { return err @@ -186,12 +275,14 @@ func (w *WakuNode) Stop() { defer w.cancel() close(w.quit) + close(w.addrChan) w.bcaster.Close() defer w.connectionNotif.Close() defer w.protocolEventSub.Close() defer w.identificationEventSub.Close() + defer w.addressChangesSub.Close() if w.rendezvous != nil { w.rendezvous.Stop() @@ -241,6 +332,10 @@ func (w *WakuNode) Lightpush() *lightpush.WakuLightPush { return w.lightPush } +func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 { + return w.discoveryV5 +} + func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { var err error w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, opts...) @@ -260,6 +355,41 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { return err } +func (w *WakuNode) mountDiscV5() error { + wakuFlag := discv5.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay) + + discV5Options := []discv5.DiscoveryV5Option{ + discv5.WithBootnodes(w.opts.discV5bootnodes), + discv5.WithUDPPort(w.opts.udpPort), + discv5.WithAutoUpdate(w.opts.discV5autoUpdate), + } + + addr := w.ListenAddresses()[0] + + ipStr, err := addr.ValueForProtocol(ma.P_IP4) + if err != nil { + return err + } + + portStr, err := addr.ValueForProtocol(ma.P_TCP) + if err != nil { + return err + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return err + } + + discoveryV5, err := discv5.NewDiscoveryV5(w.Host(), net.ParseIP(ipStr), port, w.opts.privKey, wakuFlag, discV5Options...) + if err != nil { + return err + } + + w.discoveryV5 = discoveryV5 + return nil +} + func (w *WakuNode) mountRendezvous() error { w.rendezvous = rendezvous.NewRendezvousService(w.host, w.opts.rendevousStorage) @@ -463,9 +593,9 @@ func pingPeer(ctx context.Context, host host.Host, peer peer.ID) { select { case res := <-pr: if res.Error != nil { - log.Error(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) + log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) } case <-ctx.Done(): - log.Error(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) + log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) } } diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index e8a56283..1ff18af0 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -22,7 +22,7 @@ func TestWakuNode2(t *testing.T) { wakuNode, err := New(ctx, WithPrivateKey(prvKey), - WithHostAddress([]*net.TCPAddr{hostAddr}), + WithHostAddress(hostAddr), WithWakuRelay(), ) require.NoError(t, err) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 1afd6e1a..06165237 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -6,6 +6,7 @@ import ( "net" "time" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p" connmgr "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-core/crypto" @@ -23,9 +24,11 @@ import ( const clientId string = "Go Waku v2 node" type WakuNodeParameters struct { + hostAddr *net.TCPAddr + advertiseAddr *net.IP multiAddr []ma.Multiaddr addressFactory basichost.AddrsFactory - privKey *crypto.PrivKey + privKey *ecdsa.PrivateKey libP2POpts []libp2p.Option enableRelay bool @@ -45,6 +48,12 @@ type WakuNodeParameters struct { rendevousStorage rendezvous.Storage rendezvousOpts []pubsub.DiscoverOpt + enableDiscV5 bool + udpPort int + discV5bootnodes []*enode.Node + discV5Opts []pubsub.DiscoverOpt + discV5autoUpdate bool + keepAliveInterval time.Duration enableLightPush bool @@ -54,6 +63,11 @@ type WakuNodeParameters struct { type WakuNodeOption func(*WakuNodeParameters) error +// Default options used in the libp2p node +var DefaultWakuNodeOptions = []WakuNodeOption{ + WithWakuRelay(), +} + // MultiAddresses return the list of multiaddresses configured in the node func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr { return w.multiAddr @@ -61,39 +75,43 @@ func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr { // Identity returns a libp2p option containing the identity used by the node func (w WakuNodeParameters) Identity() config.Option { - return libp2p.Identity(*w.privKey) + return libp2p.Identity(*w.GetPrivKey()) } -// WithHostAddress is a WakuNodeOption that configures libp2p to listen on a list of net endpoint addresses -func WithHostAddress(hostAddr []*net.TCPAddr) WakuNodeOption { - return func(params *WakuNodeParameters) error { - var multiAddresses []ma.Multiaddr - for _, addr := range hostAddr { - hostAddrMA, err := manet.FromNetAddr(addr) - if err != nil { - return err - } - multiAddresses = append(multiAddresses, hostAddrMA) - } +func (w WakuNodeParameters) AddressFactory() basichost.AddrsFactory { + return w.addressFactory +} - params.multiAddr = append(params.multiAddr, multiAddresses...) +// WithHostAddress is a WakuNodeOption that configures libp2p to listen on a specific address +func WithHostAddress(hostAddr *net.TCPAddr) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.hostAddr = hostAddr + hostAddrMA, err := manet.FromNetAddr(hostAddr) + if err != nil { + return err + } + params.multiAddr = append(params.multiAddr, hostAddrMA) return nil } } -// WithAdvertiseAddress is a WakuNodeOption that allows overriding the addresses used in the waku node with custom values -func WithAdvertiseAddress(addressesToAdvertise []*net.TCPAddr, enableWS bool, wsPort int) WakuNodeOption { +// WithAdvertiseAddress is a WakuNodeOption that allows overriding the address used in the waku node with custom value +func WithAdvertiseAddress(address *net.TCPAddr, enableWS bool, wsPort int) WakuNodeOption { return func(params *WakuNodeParameters) error { + params.advertiseAddr = &address.IP + + advertiseAddress, err := manet.FromNetAddr(address) + if err != nil { + return err + } + params.addressFactory = func([]ma.Multiaddr) []ma.Multiaddr { var result []multiaddr.Multiaddr - for _, adv := range addressesToAdvertise { - addr, _ := manet.FromNetAddr(adv) - result = append(result, addr) - if enableWS { - wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", adv.IP.String(), wsPort)) - result = append(result, wsMa) - } + result = append(result, advertiseAddress) + if enableWS { + wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", address, wsPort)) + result = append(result, wsMa) } return result } @@ -112,12 +130,16 @@ func WithMultiaddress(addresses []ma.Multiaddr) WakuNodeOption { // WithPrivateKey is used to set an ECDSA private key in a libp2p node func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption { return func(params *WakuNodeParameters) error { - privk := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey)) - params.privKey = &privk + params.privKey = privKey return nil } } +func (w *WakuNodeParameters) GetPrivKey() *crypto.PrivKey { + privKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(w.privKey)) + return &privKey +} + // WithLibP2POptions is a WakuNodeOption used to configure the libp2p node. // This can potentially override any libp2p config that was set with other // WakuNodeOption @@ -138,6 +160,18 @@ func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption { } } +// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery +func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.enableDiscV5 = true + params.udpPort = udpPort + params.discV5bootnodes = bootnodes + params.discV5Opts = discoverOpts + params.discV5autoUpdate = autoUpdate + return nil + } +} + // WithRendezvous is a WakuOption used to enable go-waku-rendezvous discovery. // It accepts an optional list of DiscoveryOpt options func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { @@ -232,7 +266,6 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption { var DefaultLibP2POptions = []libp2p.Option{ libp2p.DefaultTransports, libp2p.UserAgent(clientId), - libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. libp2p.EnableNATService(), // TODO: is this needed?) libp2p.ConnectionManager(connmgr.NewConnManager(200, 300, 0)), } diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index ecea197f..3f678cab 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -26,12 +26,11 @@ func TestWakuOptions(t *testing.T) { addr, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4000/ws") require.NoError(t, err) - advertiseAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:4000") - require.NoError(t, err) + advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") options := []WakuNodeOption{ - WithHostAddress([]*net.TCPAddr{hostAddr}), - WithAdvertiseAddress([]*net.TCPAddr{advertiseAddr}, false, 4000), + WithHostAddress(hostAddr), + WithAdvertiseAddress(advertiseAddr, false, 4000), WithMultiaddress([]multiaddr.Multiaddr{addr}), WithPrivateKey(prvKey), WithLibP2POptions(), @@ -39,6 +38,7 @@ func TestWakuOptions(t *testing.T) { WithRendezvous(), WithRendezvousServer(rendezvous.NewStorage(nil)), WithWakuFilter(true), + WithDiscoveryV5(123, nil, false), WithWakuStore(true, true), WithWakuStoreAndRetentionPolicy(true, time.Hour, 100), WithMessageProvider(nil), diff --git a/waku/v2/rpc/admin.go b/waku/v2/rpc/admin.go index 32d6a9f3..4f3f016c 100644 --- a/waku/v2/rpc/admin.go +++ b/waku/v2/rpc/admin.go @@ -6,6 +6,10 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/filter" + "github.com/status-im/go-waku/waku/v2/protocol/lightpush" + "github.com/status-im/go-waku/waku/v2/protocol/relay" + "github.com/status-im/go-waku/waku/v2/protocol/store" ) type AdminService struct { @@ -52,6 +56,10 @@ func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *Su return nil } +func isWakuProtocol(protocol string) bool { + return protocol == string(filter.FilterID_v20beta1) || protocol == string(relay.WakuRelayID_v200) || protocol == string(lightpush.LightPushID_v20beta1) || protocol == string(store.StoreID_v20beta3) +} + func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *PeersReply) error { peers, err := a.node.Peers() if err != nil { @@ -59,12 +67,17 @@ func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply * return nil } for _, peer := range peers { - for idx, addr := range peer.Addrs { - reply.Peers = append(reply.Peers, PeerReply{ - Multiaddr: addr.String(), - Protocol: peer.Protocols[idx], - Connected: peer.Connected, - }) + for _, addr := range peer.Addrs { + for _, proto := range peer.Protocols { + if !isWakuProtocol(proto) { + continue + } + reply.Peers = append(reply.Peers, PeerReply{ + Multiaddr: addr.String(), + Protocol: proto, + Connected: peer.Connected, + }) + } } } return nil diff --git a/waku/v2/rpc/admin_test.go b/waku/v2/rpc/admin_test.go index 9110f6e6..2b67fb29 100644 --- a/waku/v2/rpc/admin_test.go +++ b/waku/v2/rpc/admin_test.go @@ -7,11 +7,13 @@ import ( "fmt" "net/http" "testing" + "time" "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/stretchr/testify/require" ) @@ -30,6 +32,9 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) + relay, err := relay.NewWakuRelay(context.Background(), host, nil) + require.NoError(t, err) + defer relay.Stop() var reply PeersReply @@ -56,6 +61,8 @@ func TestV1Peers(t *testing.T) { require.NoError(t, err) require.True(t, reply2.Success) + time.Sleep(2 * time.Second) + err = a.GetV1Peers(request, &GetPeersArgs{}, &reply) require.NoError(t, err) require.Len(t, reply.Peers, 2)