diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 242832d15..b2bb6a4c6 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -43,6 +43,16 @@ func (w *gethWakuWrapper) PeerCount() int { return -1 } +// Added for compatibility with waku V2 +func (w *gethWakuWrapper) StartDiscV5() error { + return errors.New("not available in WakuV1") +} + +// Added for compatibility with waku V2 +func (w *gethWakuWrapper) StopDiscV5() error { + return errors.New("not available in WakuV1") +} + // PeerCount function only added for compatibility with waku V2 func (w *gethWakuWrapper) AddStorePeer(address string) (string, error) { return "", errors.New("not available in WakuV1") diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 71fa6d8de..6d0eb14e2 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -222,6 +222,14 @@ func (w *gethWakuV2Wrapper) RequestHistoricMessagesWithTimeout(peerID []byte, en return errors.New("DEPRECATED") } +func (w *gethWakuV2Wrapper) StartDiscV5() error { + return w.waku.StartDiscV5() +} + +func (w *gethWakuV2Wrapper) StopDiscV5() error { + return w.waku.StopDiscV5() +} + func (w *gethWakuV2Wrapper) AddStorePeer(address string) (string, error) { return w.waku.AddStorePeer(address) } diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index 8c52f7673..a4dcd2f52 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -18,6 +18,10 @@ type Waku interface { Peers() map[string][]string + StartDiscV5() error + + StopDiscV5() error + AddStorePeer(address string) (string, error) AddRelayPeer(address string) (string, error) diff --git a/go.mod b/go.mod index 617ee45f7..ca529ca0a 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/status-im/doubleratchet v3.0.0+incompatible - github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1 + github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5 github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6 github.com/status-im/migrate/v4 v4.6.2-status.2 diff --git a/go.sum b/go.sum index 05925648a..6d94ec5d5 100644 --- a/go.sum +++ b/go.sum @@ -1207,8 +1207,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE= github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU= -github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1 h1:SlnFFjgrrtI2XKRWWa2ZQNqJ1qJ2/X0fYVKPoBI2c5Q= -github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8= +github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5 h1:ur43GiEbW0iI+n+Iql3i1+wvgKRun/J10YcEsx985X0= +github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= diff --git a/node/status_node_services.go b/node/status_node_services.go index 199868505..52bf61b04 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -259,6 +259,8 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku, PeerExchange: nodeConfig.WakuV2Config.PeerExchange, DiscoveryLimit: nodeConfig.WakuV2Config.DiscoveryLimit, PersistPeers: nodeConfig.WakuV2Config.PersistPeers, + DiscV5BootstrapNodes: nodeConfig.ClusterConfig.DiscV5BootstrapNodes, + EnableDiscV5: nodeConfig.WakuV2Config.EnableDiscV5, } if nodeConfig.WakuV2Config.MaxMessageSize > 0 { diff --git a/params/config.go b/params/config.go index dc340a491..16e36c06c 100644 --- a/params/config.go +++ b/params/config.go @@ -238,6 +238,15 @@ type WakuV2Config struct { // PeerExchange determines whether GossipSub Peer Exchange is enabled or not PeerExchange bool + + // EnableDiscV5 indicates if DiscoveryV5 is enabled or not + EnableDiscV5 bool + + // UDPPort number to start discovery v5 + UDPPort int + + // AutoUpdate instructs the node to update their own ip address and port with the values seen by other nodes + AutoUpdate bool } // ---------- @@ -301,6 +310,9 @@ type ClusterConfig struct { // WakuRendezvousNodes is a list of go-waku rendezvous nodes to be used for ambient discovery WakuRendezvousNodes []string + + // DiscV5Nodes is a list of enr to be used for ambient discovery + DiscV5BootstrapNodes []string } // String dumps config object as nicely indented JSON diff --git a/protocol/messenger_discv5.go b/protocol/messenger_discv5.go new file mode 100644 index 000000000..2f5789510 --- /dev/null +++ b/protocol/messenger_discv5.go @@ -0,0 +1,9 @@ +package protocol + +func (m *Messenger) StartDiscV5() error { + return m.transport.StartDiscV5() +} + +func (m *Messenger) StopDiscV5() error { + return m.transport.StopDiscV5() +} diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index d8bc13a1d..425c59711 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -603,6 +603,14 @@ func PubkeyToHex(key *ecdsa.PublicKey) string { return types.EncodeHex(crypto.FromECDSAPub(key)) } +func (t *Transport) StartDiscV5() error { + return t.waku.StartDiscV5() +} + +func (t *Transport) StopDiscV5() error { + return t.waku.StopDiscV5() +} + func (t *Transport) AddStorePeer(address string) (string, error) { return t.waku.AddStorePeer(address) } diff --git a/services/ext/api.go b/services/ext/api.go index f4a339e0c..1f783a142 100644 --- a/services/ext/api.go +++ b/services/ext/api.go @@ -921,6 +921,14 @@ func (api *PublicAPI) BloomFilter() string { return hexutil.Encode(api.service.messenger.BloomFilter()) } +func (api *PublicAPI) StartDiscV5() error { + return api.service.messenger.StartDiscV5() +} + +func (api *PublicAPI) StopDiscV5() error { + return api.service.messenger.StopDiscV5() +} + func (api *PublicAPI) AddStorePeer(address string) (string, error) { return api.service.messenger.AddStorePeer(address) } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/discv5/discover.go b/vendor/github.com/status-im/go-waku/waku/v2/discv5/discover.go new file mode 100644 index 000000000..4faacbc2d --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/discv5/discover.go @@ -0,0 +1,453 @@ +package discv5 + +import ( + "context" + "crypto/ecdsa" + "fmt" + "math" + "math/rand" + "net" + "sync" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/utils" +) + +var log = logging.Logger("waku_discv5") + +type DiscoveryV5 struct { + sync.Mutex + + discovery.Discovery + + params *discV5Parameters + host host.Host + config discover.Config + udpAddr *net.UDPAddr + listener *discover.UDPv5 + localnode *enode.LocalNode + + peerCache peerCache +} + +type peerCache struct { + sync.RWMutex + recs map[peer.ID]peerRecord + rng *rand.Rand +} + +type peerRecord struct { + expire int64 + peer peer.AddrInfo +} + +type discV5Parameters struct { + autoUpdate bool + bootnodes []*enode.Node + udpPort int + tcpPort int + advertiseAddr *net.IP +} + +const WakuENRField = "waku2" + +// WakuEnrBitfield is a8-bit flag field to indicate Waku capabilities. Only the 4 LSBs are currently defined according to RFC31 (https://rfc.vac.dev/spec/31/). +type WakuEnrBitfield = uint8 + +type DiscoveryV5Option func(*discV5Parameters) + +func WithAutoUpdate(autoUpdate bool) DiscoveryV5Option { + return func(params *discV5Parameters) { + params.autoUpdate = autoUpdate + } +} + +func WithBootnodes(bootnodes []*enode.Node) DiscoveryV5Option { + return func(params *discV5Parameters) { + params.bootnodes = bootnodes + } +} + +func WithAdvertiseAddr(addr net.IP) DiscoveryV5Option { + return func(params *discV5Parameters) { + params.advertiseAddr = &addr + } +} + +func WithUDPPort(port int) DiscoveryV5Option { + return func(params *discV5Parameters) { + params.udpPort = port + } +} + +func DefaultOptions() []DiscoveryV5Option { + return []DiscoveryV5Option{ + WithUDPPort(9000), + } +} + +func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield { + var v uint8 = 0 + + if lightpush { + v |= (1 << 3) + } + + if filter { + v |= (1 << 2) + } + + if store { + v |= (1 << 1) + } + + if relay { + v |= (1 << 0) + } + + return v +} + +func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.PrivateKey, wakuFlags WakuEnrBitfield, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { + params := new(discV5Parameters) + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) + } + + params.tcpPort = tcpPort + + localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddr) + if err != nil { + return nil, err + } + + return &DiscoveryV5{ + host: host, + params: params, + peerCache: peerCache{ + rng: rand.New(rand.NewSource(rand.Int63())), + recs: make(map[peer.ID]peerRecord), + }, + localnode: localnode, + config: discover.Config{ + PrivateKey: priv, + Bootnodes: params.bootnodes, + }, + udpAddr: &net.UDPAddr{ + IP: net.IPv4zero, + Port: params.udpPort, + }, + }, nil +} + +func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort int, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + localnode := enode.NewLocalNode(db, priv) + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + localnode.SetFallbackUDP(udpPort) + localnode.Set(enr.WithEntry(WakuENRField, wakuFlags)) + localnode.Set(enr.IP(ipAddr)) + + if udpPort > 0 && udpPort <= math.MaxUint16 { + localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("could not set udpPort ", udpPort) + } + + if tcpPort > 0 && tcpPort <= math.MaxUint16 { + localnode.Set(enr.TCP(uint16(tcpPort))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("could not set tcpPort ", tcpPort) + } + + if advertiseAddr != nil { + localnode.SetStaticIP(*advertiseAddr) + } + return localnode, nil +} + +func (d *DiscoveryV5) listen() error { + conn, err := net.ListenUDP("udp", d.udpAddr) + if err != nil { + return err + } + + listener, err := discover.ListenV5(conn, d.localnode, d.config) + if err != nil { + return err + } + + d.listener = listener + + return nil +} + +func (d *DiscoveryV5) Start() error { + d.Lock() + defer d.Unlock() + + err := d.listen() + if err != nil { + return err + } + + log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.params.tcpPort)) + log.Info("Discovery V5 ", d.localnode.Node()) + + return nil +} + +func (d *DiscoveryV5) Stop() { + d.Lock() + defer d.Unlock() + + d.listener.Close() + d.listener = nil + log.Info("Stopped Discovery V5") +} + +// IsPrivate reports whether ip is a private address, according to +// RFC 1918 (IPv4 addresses) and RFC 4193 (IPv6 addresses). +// Copied/Adapted from https://go-review.googlesource.com/c/go/+/272668/11/src/net/ip.go +// Copyright (c) The Go Authors. All rights reserved. +// @TODO: once Go 1.17 is released in Q42021, remove this function as it will become part of the language +func IsPrivate(ip net.IP) bool { + if ip4 := ip.To4(); ip4 != nil { + // Following RFC 4193, Section 3. Local IPv6 Unicast Addresses which says: + // The Internet Assigned Numbers Authority (IANA) has reserved the + // following three blocks of the IPv4 address space for private internets: + // 10.0.0.0 - 10.255.255.255 (10/8 prefix) + // 172.16.0.0 - 172.31.255.255 (172.16/12 prefix) + // 192.168.0.0 - 192.168.255.255 (192.168/16 prefix) + return ip4[0] == 10 || + (ip4[0] == 172 && ip4[1]&0xf0 == 16) || + (ip4[0] == 192 && ip4[1] == 168) + } + // Following RFC 4193, Section 3. Private Address Space which says: + // The Internet Assigned Numbers Authority (IANA) has reserved the + // following block of the IPv6 address space for local internets: + // FC00:: - FDFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF (FC00::/7 prefix) + return len(ip) == net.IPv6len && ip[0]&0xfe == 0xfc +} + +func (d *DiscoveryV5) UpdateAddr(addr net.IP) error { + if !d.params.autoUpdate { + return nil + } + + d.Lock() + defer d.Unlock() + + if addr.IsUnspecified() || d.localnode.Node().IP().Equal(addr) { + return nil + } + + // TODO: improve this logic to determine if an address should be replaced or not + if !(d.localnode.Node().IP().IsLoopback() && IsPrivate(addr)) && !(IsPrivate(d.localnode.Node().IP()) && !addr.IsLoopback() && !IsPrivate(addr)) { + return nil + } + + d.localnode.Set(enr.IP(addr)) + + log.Info(fmt.Sprintf("Updated Discovery V5 node IP: %s", d.localnode.Node().IP())) + log.Info("Discovery V5 ", d.localnode.Node()) + + return nil +} + +func isWakuNode(node *enode.Node) bool { + enrField := new(WakuEnrBitfield) + if err := node.Record().Load(enr.WithEntry(WakuENRField, &enrField)); err != nil { + if !enr.IsNotFound(err) { + log.Error("could not retrieve port for enr ", node) + } + return false + } + + if enrField != nil { + return *enrField != uint8(0) + } + + return false +} + +func hasTCPPort(node *enode.Node) bool { + enrTCP := new(enr.TCP) + if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil { + if !enr.IsNotFound(err) { + log.Error("could not retrieve port for enr ", node) + } + return false + } + + return true +} + +func (d *DiscoveryV5) evaluateNode(node *enode.Node) bool { + if node == nil || node.IP() == nil { + return false + } + + if !isWakuNode(node) || !hasTCPPort(node) { + return false + } + + _, err := utils.EnodeToPeerInfo(node) + + if err != nil { + log.Error("could not obtain peer info from enode:", err) + return false + } + + return true +} + +func (c *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + // Get options + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return 0, err + } + + // TODO: once discv5 spec introduces capability and topic discovery, implement this function + + return 20 * time.Minute, nil +} + +func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int, doneCh chan struct{}) { + for { + if len(d.peerCache.recs) >= limit { + break + } + + if ctx.Err() != nil { + break + } + + exists := iterator.Next() + if !exists { + break + } + + address, err := utils.EnodeToMultiAddr(iterator.Node()) + if err != nil { + log.Error(err) + continue + } + + peerInfo, err := peer.AddrInfoFromP2pAddr(address) + if err != nil { + log.Error(err) + continue + } + + d.peerCache.recs[peerInfo.ID] = peerRecord{ + expire: time.Now().Unix() + 3600, // Expires in 1hr + peer: *peerInfo, + } + } + + close(doneCh) +} + +func (d *DiscoveryV5) removeExpiredPeers() int { + // Remove all expired entries from cache + currentTime := time.Now().Unix() + newCacheSize := len(d.peerCache.recs) + + for p := range d.peerCache.recs { + rec := d.peerCache.recs[p] + if rec.expire < currentTime { + newCacheSize-- + delete(d.peerCache.recs, p) + } + } + + return newCacheSize +} + +func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + // Get options + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return nil, err + } + + const maxLimit = 100 + limit := options.Limit + if limit == 0 || limit > maxLimit { + limit = maxLimit + } + + // We are ignoring the topic. Future versions might use a map[string]*peerCache instead where the string represents the pubsub topic + + d.peerCache.Lock() + defer d.peerCache.Unlock() + + cacheSize := d.removeExpiredPeers() + + // Discover new records if we don't have enough + if cacheSize < limit && d.listener != nil { + d.Lock() + + iterator := d.listener.RandomNodes() + iterator = enode.Filter(iterator, d.evaluateNode) + defer iterator.Close() + + doneCh := make(chan struct{}) + go d.iterate(ctx, iterator, limit, doneCh) + + select { + case <-ctx.Done(): + case <-doneCh: + } + + d.Unlock() + } + + // Randomize and fill channel with available records + count := len(d.peerCache.recs) + if limit < count { + count = limit + } + + chPeer := make(chan peer.AddrInfo, count) + + perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count] + permSet := make(map[int]int) + for i, v := range perm { + permSet[v] = i + } + + sendLst := make([]*peer.AddrInfo, count) + iter := 0 + for k := range d.peerCache.recs { + if sendIndex, ok := permSet[iter]; ok { + peerInfo := d.peerCache.recs[k].peer + sendLst[sendIndex] = &peerInfo + } + iter++ + } + + for _, send := range sendLst { + chPeer <- *send + } + + close(chPeer) + + return chPeer, err +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/discovery/enr.go b/vendor/github.com/status-im/go-waku/waku/v2/dnsdisc/enr.go similarity index 72% rename from vendor/github.com/status-im/go-waku/waku/v2/discovery/enr.go rename to vendor/github.com/status-im/go-waku/waku/v2/dnsdisc/enr.go index 3fccba245..0d9212a9e 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/discovery/enr.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/dnsdisc/enr.go @@ -1,12 +1,10 @@ -package discovery +package dnsdisc import ( "context" - "fmt" "github.com/ethereum/go-ethereum/p2p/dnsdisc" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/utils" ma "github.com/multiformats/go-multiaddr" ) @@ -44,7 +42,7 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) } for _, node := range tree.Nodes() { - m, err := EnodeToMultiAddr(node) + m, err := utils.EnodeToMultiAddr(node) if err != nil { return nil, err } @@ -54,12 +52,3 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) return multiAddrs, nil } - -func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { - peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()}) - if err != nil { - return nil, err - } - - return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID)) -} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/discovery/resolver.go b/vendor/github.com/status-im/go-waku/waku/v2/dnsdisc/resolver.go similarity index 96% rename from vendor/github.com/status-im/go-waku/waku/v2/discovery/resolver.go rename to vendor/github.com/status-im/go-waku/waku/v2/dnsdisc/resolver.go index 9708f42e0..8985b33b9 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/discovery/resolver.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/dnsdisc/resolver.go @@ -1,4 +1,4 @@ -package discovery +package dnsdisc import ( "context" diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/connectedness.go b/vendor/github.com/status-im/go-waku/waku/v2/node/connectedness.go index 4a72f4b35..0a20d0e8e 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/connectedness.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/connectedness.go @@ -77,7 +77,7 @@ func (c ConnectionNotifier) Close() { func (w *WakuNode) sendConnStatus() { isOnline, hasHistory := w.Status() if w.connStatusChan != nil { - connStatus := ConnStatus{IsOnline: isOnline, HasHistory: hasHistory, Peers: w.Peers()} + connStatus := ConnStatus{IsOnline: isOnline, HasHistory: hasHistory, Peers: w.PeerStats()} w.connStatusChan <- connStatus } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go index 32b3de513..614861c40 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go @@ -3,6 +3,8 @@ package node import ( "context" "fmt" + "net" + "strconv" "time" logging "github.com/ipfs/go-log" @@ -10,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" p2pproto "github.com/libp2p/go-libp2p-core/protocol" @@ -20,6 +23,7 @@ import ( rendezvous "github.com/status-im/go-waku-rendezvous" v2 "github.com/status-im/go-waku/waku/v2" + "github.com/status-im/go-waku/waku/v2/discv5" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" @@ -32,6 +36,13 @@ var log = logging.Logger("wakunode") type Message []byte +type Peer struct { + ID peer.ID + Protocols []string + Addrs []ma.Multiaddr + Connected bool +} + type WakuNode struct { host host.Host opts *WakuNodeParameters @@ -42,11 +53,16 @@ type WakuNode struct { rendezvous *rendezvous.RendezvousService store *store.WakuStore + addrChan chan ma.Multiaddr + + discoveryV5 *discv5.DiscoveryV5 + bcaster v2.Broadcaster connectionNotif ConnectionNotifier protocolEventSub event.Subscription identificationEventSub event.Subscription + addressChangesSub event.Subscription ctx context.Context cancel context.CancelFunc @@ -64,6 +80,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { params.libP2POpts = DefaultLibP2POptions + opts = append(DefaultWakuNodeOptions, opts...) for _, opt := range opts { err := opt(params) if err != nil { @@ -72,6 +89,14 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } } + // Setting default host address if none was provided + if params.hostAddr == nil { + err := WithHostAddress(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})(params) + if err != nil { + cancel() + return nil, err + } + } if len(params.multiAddr) > 0 { params.libP2POpts = append(params.libP2POpts, libp2p.ListenAddrs(params.multiAddr...)) } @@ -97,6 +122,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.ctx = ctx w.opts = params w.quit = make(chan struct{}) + w.addrChan = make(chan ma.Multiaddr, 1024) if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil { return nil, err @@ -106,6 +132,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { return nil, err } + if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil { + return nil, err + } + if params.connStatusC != nil { w.connStatusChan = params.connStatusC } @@ -119,13 +149,83 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.startKeepAlive(w.opts.keepAliveInterval) } - for _, addr := range w.ListenAddresses() { - log.Info("Listening on ", addr) - } + go w.checkForAddressChanges() + go w.onAddrChange() return w, nil } +func (w *WakuNode) onAddrChange() { + for m := range w.addrChan { + ipStr, err := m.ValueForProtocol(ma.P_IP4) + if err != nil { + log.Error(fmt.Sprintf("could not extract ip from ma %s: %s", m, err.Error())) + continue + } + ip := net.ParseIP(ipStr) + if !ip.IsLoopback() && !ip.IsUnspecified() { + if w.opts.enableDiscV5 { + err := w.discoveryV5.UpdateAddr(ip) + if err != nil { + log.Error(fmt.Sprintf("could not update DiscV5 address with IP %s: %s", ip, err.Error())) + continue + } + } + } + } +} + +func (w *WakuNode) logAddress(addr ma.Multiaddr) { + log.Info("Listening on ", addr) + + // TODO: make this optional depending on DNS Disc being enabled + if w.opts.privKey != nil { + enr, ip, err := utils.GetENRandIP(addr, w.opts.privKey) + if err != nil { + log.Error("could not obtain ENR record from multiaddress", err) + } else { + log.Info(fmt.Sprintf("ENR for IP %s: %s", ip, enr)) + } + } +} + +func (w *WakuNode) checkForAddressChanges() { + addrs := w.ListenAddresses() + first := make(chan struct{}, 1) + first <- struct{}{} + for { + select { + case <-w.quit: + return + case <-first: + for _, addr := range addrs { + w.logAddress(addr) + } + case <-w.addressChangesSub.Out(): + newAddrs := w.ListenAddresses() + print := false + if len(addrs) != len(newAddrs) { + print = true + } else { + for i := range newAddrs { + if addrs[i].String() != newAddrs[i].String() { + print = true + break + } + } + } + if print { + addrs = newAddrs + log.Warn("Change in host multiaddresses") + for _, addr := range newAddrs { + w.addrChan <- addr + w.logAddress(addr) + } + } + } + } +} + func (w *WakuNode) Start() error { w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) if w.opts.enableStore { @@ -141,6 +241,17 @@ func (w *WakuNode) Start() error { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...)) } + if w.opts.enableDiscV5 { + err := w.mountDiscV5() + if err != nil { + return err + } + } + + if w.opts.enableDiscV5 { + w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...)) + } + err := w.mountRelay(w.opts.wOpts...) if err != nil { return err @@ -178,12 +289,14 @@ func (w *WakuNode) Stop() { defer w.cancel() close(w.quit) + close(w.addrChan) w.bcaster.Close() defer w.connectionNotif.Close() defer w.protocolEventSub.Close() defer w.identificationEventSub.Close() + defer w.addressChangesSub.Close() if w.rendezvous != nil { w.rendezvous.Stop() @@ -233,6 +346,14 @@ func (w *WakuNode) Lightpush() *lightpush.WakuLightPush { return w.lightPush } +func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 { + return w.discoveryV5 +} + +func (w *WakuNode) Broadcaster() v2.Broadcaster { + return w.bcaster +} + func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { var err error w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, opts...) @@ -241,7 +362,7 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { } if w.opts.enableRelay { - _, err = w.relay.Subscribe(w.ctx, nil) + _, err = w.relay.Subscribe(w.ctx) if err != nil { return err } @@ -252,6 +373,41 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { return err } +func (w *WakuNode) mountDiscV5() error { + wakuFlag := discv5.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay) + + discV5Options := []discv5.DiscoveryV5Option{ + discv5.WithBootnodes(w.opts.discV5bootnodes), + discv5.WithUDPPort(w.opts.udpPort), + discv5.WithAutoUpdate(w.opts.discV5autoUpdate), + } + + addr := w.ListenAddresses()[0] + + ipStr, err := addr.ValueForProtocol(ma.P_IP4) + if err != nil { + return err + } + + portStr, err := addr.ValueForProtocol(ma.P_TCP) + if err != nil { + return err + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return err + } + + discoveryV5, err := discv5.NewDiscoveryV5(w.Host(), net.ParseIP(ipStr), port, w.opts.privKey, wakuFlag, discV5Options...) + if err != nil { + return err + } + + w.discoveryV5 = discoveryV5 + return nil +} + func (w *WakuNode) mountRendezvous() error { w.rendezvous = rendezvous.NewRendezvousService(w.host, w.opts.rendevousStorage) @@ -270,14 +426,16 @@ func (w *WakuNode) startStore() { // TODO: extract this to a function and run it when you go offline // TODO: determine if a store is listening to a topic go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { - t := time.NewTicker(time.Second) peerVerif: for { select { case <-w.quit: return - case <-t.C: + case <-ticker.C: _, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta3)) if err == nil { break peerVerif @@ -383,7 +541,7 @@ func (w *WakuNode) PeerCount() int { return len(w.host.Network().Peers()) } -func (w *WakuNode) Peers() PeerStats { +func (w *WakuNode) PeerStats() PeerStats { p := make(PeerStats) for _, peerID := range w.host.Network().Peers() { protocols, err := w.host.Peerstore().GetProtocols(peerID) @@ -395,6 +553,26 @@ func (w *WakuNode) Peers() PeerStats { return p } +func (w *WakuNode) Peers() ([]*Peer, error) { + var peers []*Peer + for _, peerId := range w.host.Peerstore().Peers() { + connected := w.host.Network().Connectedness(peerId) == network.Connected + protocols, err := w.host.Peerstore().GetProtocols(peerId) + if err != nil { + return nil, err + } + + addrs := w.host.Peerstore().Addrs(peerId) + peers = append(peers, &Peer{ + ID: peerId, + Protocols: protocols, + Connected: connected, + Addrs: addrs, + }) + } + return peers, nil +} + // startKeepAlive creates a go routine that periodically pings connected peers. // This is necessary because TCP connections are automatically closed due to inactivity, // and doing a ping will avoid this (with a small bandwidth cost) @@ -402,6 +580,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { log.Info("Setting up ping protocol with duration of ", t) ticker := time.NewTicker(t) + defer ticker.Stop() go func() { for { @@ -419,7 +598,6 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { } } case <-w.quit: - ticker.Stop() return } } @@ -435,9 +613,9 @@ func pingPeer(ctx context.Context, host host.Host, peer peer.ID) { select { case res := <-pr: if res.Error != nil { - log.Error(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) + log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) } case <-ctx.Done(): - log.Error(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) + log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) } } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go index 1afd6e1a4..061652371 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go @@ -6,6 +6,7 @@ import ( "net" "time" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p" connmgr "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-core/crypto" @@ -23,9 +24,11 @@ import ( const clientId string = "Go Waku v2 node" type WakuNodeParameters struct { + hostAddr *net.TCPAddr + advertiseAddr *net.IP multiAddr []ma.Multiaddr addressFactory basichost.AddrsFactory - privKey *crypto.PrivKey + privKey *ecdsa.PrivateKey libP2POpts []libp2p.Option enableRelay bool @@ -45,6 +48,12 @@ type WakuNodeParameters struct { rendevousStorage rendezvous.Storage rendezvousOpts []pubsub.DiscoverOpt + enableDiscV5 bool + udpPort int + discV5bootnodes []*enode.Node + discV5Opts []pubsub.DiscoverOpt + discV5autoUpdate bool + keepAliveInterval time.Duration enableLightPush bool @@ -54,6 +63,11 @@ type WakuNodeParameters struct { type WakuNodeOption func(*WakuNodeParameters) error +// Default options used in the libp2p node +var DefaultWakuNodeOptions = []WakuNodeOption{ + WithWakuRelay(), +} + // MultiAddresses return the list of multiaddresses configured in the node func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr { return w.multiAddr @@ -61,39 +75,43 @@ func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr { // Identity returns a libp2p option containing the identity used by the node func (w WakuNodeParameters) Identity() config.Option { - return libp2p.Identity(*w.privKey) + return libp2p.Identity(*w.GetPrivKey()) } -// WithHostAddress is a WakuNodeOption that configures libp2p to listen on a list of net endpoint addresses -func WithHostAddress(hostAddr []*net.TCPAddr) WakuNodeOption { - return func(params *WakuNodeParameters) error { - var multiAddresses []ma.Multiaddr - for _, addr := range hostAddr { - hostAddrMA, err := manet.FromNetAddr(addr) - if err != nil { - return err - } - multiAddresses = append(multiAddresses, hostAddrMA) - } +func (w WakuNodeParameters) AddressFactory() basichost.AddrsFactory { + return w.addressFactory +} - params.multiAddr = append(params.multiAddr, multiAddresses...) +// WithHostAddress is a WakuNodeOption that configures libp2p to listen on a specific address +func WithHostAddress(hostAddr *net.TCPAddr) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.hostAddr = hostAddr + hostAddrMA, err := manet.FromNetAddr(hostAddr) + if err != nil { + return err + } + params.multiAddr = append(params.multiAddr, hostAddrMA) return nil } } -// WithAdvertiseAddress is a WakuNodeOption that allows overriding the addresses used in the waku node with custom values -func WithAdvertiseAddress(addressesToAdvertise []*net.TCPAddr, enableWS bool, wsPort int) WakuNodeOption { +// WithAdvertiseAddress is a WakuNodeOption that allows overriding the address used in the waku node with custom value +func WithAdvertiseAddress(address *net.TCPAddr, enableWS bool, wsPort int) WakuNodeOption { return func(params *WakuNodeParameters) error { + params.advertiseAddr = &address.IP + + advertiseAddress, err := manet.FromNetAddr(address) + if err != nil { + return err + } + params.addressFactory = func([]ma.Multiaddr) []ma.Multiaddr { var result []multiaddr.Multiaddr - for _, adv := range addressesToAdvertise { - addr, _ := manet.FromNetAddr(adv) - result = append(result, addr) - if enableWS { - wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", adv.IP.String(), wsPort)) - result = append(result, wsMa) - } + result = append(result, advertiseAddress) + if enableWS { + wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", address, wsPort)) + result = append(result, wsMa) } return result } @@ -112,12 +130,16 @@ func WithMultiaddress(addresses []ma.Multiaddr) WakuNodeOption { // WithPrivateKey is used to set an ECDSA private key in a libp2p node func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption { return func(params *WakuNodeParameters) error { - privk := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey)) - params.privKey = &privk + params.privKey = privKey return nil } } +func (w *WakuNodeParameters) GetPrivKey() *crypto.PrivKey { + privKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(w.privKey)) + return &privKey +} + // WithLibP2POptions is a WakuNodeOption used to configure the libp2p node. // This can potentially override any libp2p config that was set with other // WakuNodeOption @@ -138,6 +160,18 @@ func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption { } } +// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery +func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.enableDiscV5 = true + params.udpPort = udpPort + params.discV5bootnodes = bootnodes + params.discV5Opts = discoverOpts + params.discV5autoUpdate = autoUpdate + return nil + } +} + // WithRendezvous is a WakuOption used to enable go-waku-rendezvous discovery. // It accepts an optional list of DiscoveryOpt options func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { @@ -232,7 +266,6 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption { var DefaultLibP2POptions = []libp2p.Option{ libp2p.DefaultTransports, libp2p.UserAgent(clientId), - libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. libp2p.EnableNATService(), // TODO: is this needed?) libp2p.ConnectionManager(connmgr.NewConnManager(200, 300, 0)), } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go index 161a9782c..efafb5ed9 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go @@ -15,7 +15,6 @@ import ( "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" - "github.com/status-im/go-waku/waku/v2/utils" "go.opencensus.io/stats" "go.opencensus.io/tag" ) @@ -27,13 +26,6 @@ var ( ) type ( - FilterSubscribeParameters struct { - host host.Host - selectedPeer peer.ID - } - - FilterSubscribeOption func(*FilterSubscribeParameters) - Filter struct { PeerID peer.ID Topic string @@ -65,41 +57,32 @@ type ( // NOTE This is just a start, the design of this protocol isn't done yet. It // should be direct payload exchange (a la req-resp), not be coupled with the // relay protocol. - const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") -func WithPeer(p peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { - params.selectedPeer = p +func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter { + ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) + if err != nil { + log.Error(err) } -} -func WithAutomaticPeerSelection() FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1)) - if err == nil { - params.selectedPeer = *p - } else { - log.Info("Error selecting peer: ", err) - } - } -} + wf := new(WakuFilter) + wf.ctx = ctx + wf.MsgC = make(chan *protocol.Envelope) + wf.h = host + wf.isFullNode = isFullNode + wf.filters = NewFilterMap() + wf.subscribers = NewSubscribers() -func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1)) - if err == nil { - params.selectedPeer = *p - } else { - log.Info("Error selecting peer: ", err) - } - } -} + wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) + go wf.FilterListener() -func DefaultOptions() []FilterSubscribeOption { - return []FilterSubscribeOption{ - WithAutomaticPeerSelection(), + if wf.isFullNode { + log.Info("Filter protocol started") + } else { + log.Info("Filter protocol started (only client mode)") } + + return wf } func (wf *WakuFilter) onRequest(s network.Stream) { @@ -148,32 +131,6 @@ func (wf *WakuFilter) onRequest(s network.Stream) { } } -func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter { - ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) - if err != nil { - log.Error(err) - } - - wf := new(WakuFilter) - wf.ctx = ctx - wf.MsgC = make(chan *protocol.Envelope) - wf.h = host - wf.isFullNode = isFullNode - wf.filters = NewFilterMap() - wf.subscribers = NewSubscribers() - - wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) - go wf.FilterListener() - - if wf.isFullNode { - log.Info("Filter protocol started") - } else { - log.Info("Filter protocol started (only client mode)") - } - - return wf -} - func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error { pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter_option.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter_option.go new file mode 100644 index 000000000..11c35953f --- /dev/null +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter_option.go @@ -0,0 +1,52 @@ +package filter + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/utils" +) + +type ( + FilterSubscribeParameters struct { + host host.Host + selectedPeer peer.ID + } + + FilterSubscribeOption func(*FilterSubscribeParameters) +) + +func WithPeer(p peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + params.selectedPeer = p + } +} + +func WithAutomaticPeerSelection() FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + +func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + +func DefaultOptions() []FilterSubscribeOption { + return []FilterSubscribeOption{ + WithAutomaticPeerSelection(), + } +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 053364b45..290fa6704 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -77,13 +77,13 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { log.Info("lightpush push request") response := new(pb.PushResponse) if !wakuLP.IsClientOnly() { - pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic) + pubSubTopic := requestPushRPC.Query.PubsubTopic message := requestPushRPC.Query.Message // TODO: Assumes success, should probably be extended to check for network, peers, etc // It might make sense to use WithReadiness option here? - _, err := wakuLP.relay.Publish(wakuLP.ctx, message, &pubSubTopic) + _, err := wakuLP.relay.PublishToTopic(wakuLP.ctx, message, pubSubTopic) if err != nil { response.IsSuccess = false @@ -181,14 +181,14 @@ func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) { +func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string, opts ...LightPushOption) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } req := new(pb.PushRequest) req.Message = message - req.PubsubTopic = string(relay.GetTopic(topic)) + req.PubsubTopic = topic response, err := wakuLP.request(ctx, req, opts...) if err != nil { @@ -202,3 +202,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessag return nil, errors.New(response.Info) } } + +func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, opts ...LightPushOption) ([]byte, error) { + return wakuLP.PublishToTopic(ctx, message, relay.DefaultWakuTopic, opts...) +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go index be1e2cf42..9bff73959 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -24,11 +24,9 @@ import ( var log = logging.Logger("wakurelay") -type Topic string - const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") -var DefaultWakuTopic Topic = Topic(waku_proto.DefaultPubsubTopic().String()) +var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String() type WakuRelay struct { host host.Host @@ -37,13 +35,12 @@ type WakuRelay struct { bcaster v2.Broadcaster // TODO: convert to concurrent maps - topics map[Topic]struct{} topicsMutex sync.Mutex - wakuRelayTopics map[Topic]*pubsub.Topic - relaySubs map[Topic]*pubsub.Subscription + wakuRelayTopics map[string]*pubsub.Topic + relaySubs map[string]*pubsub.Subscription // TODO: convert to concurrent maps - subscriptions map[Topic][]*Subscription + subscriptions map[string][]*Subscription subscriptionsMutex sync.Mutex } @@ -56,10 +53,9 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h - w.topics = make(map[Topic]struct{}) - w.wakuRelayTopics = make(map[Topic]*pubsub.Topic) - w.relaySubs = make(map[Topic]*pubsub.Subscription) - w.subscriptions = make(map[Topic][]*Subscription) + w.wakuRelayTopics = make(map[string]*pubsub.Topic) + w.relaySubs = make(map[string]*pubsub.Subscription) + w.subscriptions = make(map[string][]*Subscription) w.bcaster = bcaster // default options required by WakuRelay @@ -96,12 +92,12 @@ func (w *WakuRelay) PubSub() *pubsub.PubSub { return w.pubsub } -func (w *WakuRelay) Topics() []Topic { +func (w *WakuRelay) Topics() []string { defer w.topicsMutex.Unlock() w.topicsMutex.Lock() - var result []Topic - for topic := range w.topics { + var result []string + for topic := range w.relaySubs { result = append(result, topic) } return result @@ -111,11 +107,10 @@ func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) { w.pubsub = pubSub } -func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) { +func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) { defer w.topicsMutex.Unlock() w.topicsMutex.Lock() - w.topics[topic] = struct{}{} pubSubTopic, ok := w.wakuRelayTopics[topic] if !ok { // Joins topic if node hasn't joined yet newTopic, err := w.pubsub.Join(string(topic)) @@ -128,7 +123,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) { return pubSubTopic, nil } -func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error) { +func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) { sub, ok := w.relaySubs[topic] if !ok { pubSubTopic, err := w.upsertTopic(topic) @@ -148,7 +143,7 @@ func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error return sub, nil } -func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) { +func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) { // Publish a `WakuMessage` to a PubSub topic. if w.pubsub == nil { return nil, errors.New("PubSub hasn't been set") @@ -158,7 +153,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic return nil, errors.New("message can't be null") } - pubSubTopic, err := w.upsertTopic(GetTopic(topic)) + pubSubTopic, err := w.upsertTopic(topic) if err != nil { return nil, err @@ -179,12 +174,8 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic return hash, nil } -func GetTopic(topic *Topic) Topic { - var t Topic = DefaultWakuTopic - if topic != nil { - t = *topic - } - return t +func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) { + return w.PublishToTopic(ctx, message, DefaultWakuTopic) } func (w *WakuRelay) Stop() { @@ -200,11 +191,10 @@ func (w *WakuRelay) Stop() { w.subscriptions = nil } -func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, error) { +func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) { // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. - t := GetTopic(topic) - sub, err := w.subscribe(t) + sub, err := w.subscribe(topic) if err != nil { return nil, err @@ -219,23 +209,26 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, w.subscriptionsMutex.Lock() defer w.subscriptionsMutex.Unlock() - w.subscriptions[t] = append(w.subscriptions[t], subscription) + w.subscriptions[topic] = append(w.subscriptions[topic], subscription) if w.bcaster != nil { w.bcaster.Register(subscription.C) } - go w.subscribeToTopic(t, subscription, sub) + go w.subscribeToTopic(topic, subscription, sub) return subscription, nil } -func (w *WakuRelay) Unsubscribe(ctx context.Context, topic Topic) error { - if _, ok := w.topics[topic]; !ok { +func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) { + return w.SubscribeToTopic(ctx, DefaultWakuTopic) +} + +func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { + if _, ok := w.relaySubs[topic]; !ok { return fmt.Errorf("topics %s is not subscribed", (string)(topic)) } log.Info("Unsubscribing from topic ", topic) - delete(w.topics, topic) for _, sub := range w.subscriptions[topic] { sub.Unsubscribe() @@ -268,7 +261,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < log.Error(fmt.Errorf("subscription failed: %w", err)) sub.Cancel() close(msgChannel) - for _, subscription := range w.subscriptions[Topic(sub.Topic())] { + for _, subscription := range w.subscriptions[sub.Topic()] { subscription.Unsubscribe() } } @@ -279,7 +272,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < return msgChannel } -func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *pubsub.Subscription) { +func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) { ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay")) if err != nil { log.Error(err) diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go index b47efdddc..300be1cfb 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go @@ -74,12 +74,15 @@ func (self *MessageQueue) cleanOlderRecords() { func (self *MessageQueue) checkForOlderRecords(d time.Duration) { ticker := time.NewTicker(d) + defer ticker.Stop() - select { - case <-self.quit: - return - case <-ticker.C: - self.cleanOlderRecords() + for { + select { + case <-self.quit: + return + case <-ticker.C: + self.cleanOlderRecords() + } } } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/utils/peer.go b/vendor/github.com/status-im/go-waku/waku/v2/utils/peer.go index 862363a4d..28064b3aa 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/utils/peer.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/utils/peer.go @@ -2,11 +2,20 @@ package utils import ( "context" + "crypto/ecdsa" "errors" + "fmt" + "math" "math/rand" + "net" + "strconv" "sync" "time" + ma "github.com/multiformats/go-multiaddr" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -112,3 +121,62 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str return nil, ErrNoPeersAvailable } } + +func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { + peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()}) + if err != nil { + return nil, err + } + + return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID)) +} + +func EnodeToPeerInfo(node *enode.Node) (*peer.AddrInfo, error) { + address, err := EnodeToMultiAddr(node) + if err != nil { + return nil, err + } + + return peer.AddrInfoFromP2pAddr(address) +} + +func GetENRandIP(addr ma.Multiaddr, privK *ecdsa.PrivateKey) (*enode.Node, *net.TCPAddr, error) { + ip, err := addr.ValueForProtocol(ma.P_IP4) + if err != nil { + return nil, nil, err + } + + portStr, err := addr.ValueForProtocol(ma.P_TCP) + if err != nil { + return nil, nil, err + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, nil, err + } + + tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port)) + if err != nil { + return nil, nil, err + } + + r := &enr.Record{} + + if port > 0 && port <= math.MaxUint16 { + r.Set(enr.TCP(uint16(port))) // lgtm [go/incorrect-integer-conversion] + } else { + return nil, nil, fmt.Errorf("could not set port %d", port) + } + + r.Set(enr.IP(net.ParseIP(ip))) + + err = enode.SignV4(r, privK) + if err != nil { + return nil, nil, err + } + + node, err := enode.New(enode.ValidSchemes, r) + + return node, tcpAddr, err +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/discovery/public_key.go b/vendor/github.com/status-im/go-waku/waku/v2/utils/public_key.go similarity index 99% rename from vendor/github.com/status-im/go-waku/waku/v2/discovery/public_key.go rename to vendor/github.com/status-im/go-waku/waku/v2/utils/public_key.go index d80770c8b..937adc49d 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/discovery/public_key.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/utils/public_key.go @@ -1,4 +1,4 @@ -package discovery +package utils import ( "crypto/ecdsa" diff --git a/vendor/modules.txt b/vendor/modules.txt index 503a6898b..97148607e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -447,10 +447,11 @@ github.com/spacemonkeygo/spacelog github.com/status-im/doubleratchet # github.com/status-im/go-multiaddr-ethv4 v1.2.1 github.com/status-im/go-multiaddr-ethv4 -# github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1 +# github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5 github.com/status-im/go-waku/waku/persistence github.com/status-im/go-waku/waku/v2 -github.com/status-im/go-waku/waku/v2/discovery +github.com/status-im/go-waku/waku/v2/discv5 +github.com/status-im/go-waku/waku/v2/dnsdisc github.com/status-im/go-waku/waku/v2/metrics github.com/status-im/go-waku/waku/v2/node github.com/status-im/go-waku/waku/v2/protocol diff --git a/wakuv2/config.go b/wakuv2/config.go index c7b0deb06..381ef3142 100644 --- a/wakuv2/config.go +++ b/wakuv2/config.go @@ -39,7 +39,11 @@ type Config struct { LightpushNodes []string `toml:",omitempty"` Rendezvous bool `toml:",omitempty"` WakuRendezvousNodes []string `toml:",omitempty"` + DiscV5BootstrapNodes []string `toml:",omitempty"` + EnableDiscV5 bool `toml:",omitempty"` DiscoveryLimit int `toml:",omitempty"` + AutoUpdate bool `toml:",omitempty"` + UDPPort int `toml:",omitempty"` } var DefaultConfig = Config{ @@ -49,4 +53,42 @@ var DefaultConfig = Config{ KeepAliveInterval: 10, // second DiscoveryLimit: 40, MinPeersForRelay: 2, // TODO: determine correct value with Vac team + UDPPort: 9000, + AutoUpdate: false, +} + +func setDefaults(cfg *Config) *Config { + if cfg == nil { + cfg = new(Config) + } + + if cfg.MaxMessageSize == 0 { + cfg.MaxMessageSize = DefaultConfig.MaxMessageSize + } + + if cfg.Host == "" { + cfg.Host = DefaultConfig.Host + } + + if cfg.Port == 0 { + cfg.Port = DefaultConfig.Port + } + + if cfg.KeepAliveInterval == 0 { + cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval + } + + if cfg.DiscoveryLimit == 0 { + cfg.DiscoveryLimit = DefaultConfig.DiscoveryLimit + } + + if cfg.MinPeersForRelay == 0 { + cfg.MinPeersForRelay = DefaultConfig.MinPeersForRelay + } + + if cfg.UDPPort == 0 { + cfg.UDPPort = DefaultConfig.UDPPort + } + + return cfg } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index f2aaca1ff..3e5069c77 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -48,6 +48,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rpc" "github.com/libp2p/go-libp2p" @@ -57,7 +58,7 @@ import ( libp2pproto "github.com/libp2p/go-libp2p-core/protocol" rendezvous "github.com/status-im/go-waku-rendezvous" - "github.com/status-im/go-waku/waku/v2/discovery" + "github.com/status-im/go-waku/waku/v2/dnsdisc" "github.com/status-im/go-waku/waku/v2/protocol" wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/filter" @@ -69,7 +70,7 @@ import ( "github.com/status-im/status-go/wakuv2/common" "github.com/status-im/status-go/wakuv2/persistence" - libp2pdisc "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/discovery" node "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/store" @@ -127,38 +128,6 @@ type Waku struct { logger *zap.Logger } -func setDefaults(cfg *Config) *Config { - if cfg == nil { - cfg = new(Config) - } - - if cfg.MaxMessageSize == 0 { - cfg.MaxMessageSize = DefaultConfig.MaxMessageSize - } - - if cfg.Host == "" { - cfg.Host = DefaultConfig.Host - } - - if cfg.Port == 0 { - cfg.Port = DefaultConfig.Port - } - - if cfg.KeepAliveInterval == 0 { - cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval - } - - if cfg.DiscoveryLimit == 0 { - cfg.DiscoveryLimit = DefaultConfig.DiscoveryLimit - } - - if cfg.MinPeersForRelay == 0 { - cfg.MinPeersForRelay = DefaultConfig.MinPeersForRelay - } - - return cfg -} - // New creates a WakuV2 client ready to communicate through the LibP2P network. func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, error) { if logger == nil { @@ -245,13 +214,25 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, opts := []node.WakuNodeOption{ node.WithLibP2POptions(libp2pOpts...), node.WithPrivateKey(privateKey), - node.WithHostAddress([]*net.TCPAddr{hostAddr}), + node.WithHostAddress(hostAddr), node.WithConnectionStatusChannel(connStatusChan), node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second), } if cfg.Rendezvous { - opts = append(opts, node.WithRendezvous(pubsub.WithDiscoveryOpts(libp2pdisc.Limit(cfg.DiscoveryLimit)))) + opts = append(opts, node.WithRendezvous(pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit)))) + } + + if cfg.EnableDiscV5 { + var bootnodes []*enode.Node + for _, addr := range cfg.DiscV5BootstrapNodes { + bootnode, err := enode.Parse(enode.ValidSchemes, addr) + if err != nil { + return nil, err + } + bootnodes = append(bootnodes, bootnode) + } + opts = append(opts, node.WithDiscoveryV5(cfg.UDPPort, bootnodes, cfg.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit)))) } if cfg.LightClient { @@ -327,7 +308,7 @@ func (w *Waku) dnsDiscover(enrtreeAddress string, protocol libp2pproto.ID, apply if !ok { w.dnsAddressCacheLock.Lock() var err error - multiaddresses, err = discovery.RetrieveNodes(ctx, enrtreeAddress) + multiaddresses, err = dnsdisc.RetrieveNodes(ctx, enrtreeAddress) w.dnsAddressCache[enrtreeAddress] = multiaddresses w.dnsAddressCacheLock.Unlock() if err != nil { @@ -396,7 +377,7 @@ func (w *Waku) runRelayMsgLoop() { return } - sub, err := w.node.Relay().Subscribe(context.Background(), nil) + sub, err := w.node.Relay().Subscribe(context.Background()) if err != nil { fmt.Println("Could not subscribe:", err) return @@ -429,8 +410,6 @@ func (w *Waku) runFilterMsgLoop() { } func (w *Waku) subscribeWakuFilterTopic(topics [][]byte) { - pubsubTopic := relay.GetTopic(nil) - var contentTopics []string for _, topic := range topics { contentTopics = append(contentTopics, common.BytesToTopic(topic).ContentTopic()) @@ -438,7 +417,7 @@ func (w *Waku) subscribeWakuFilterTopic(topics [][]byte) { var err error contentFilter := filter.ContentFilter{ - Topic: string(pubsubTopic), + Topic: relay.DefaultWakuTopic, ContentTopics: contentTopics, } @@ -764,7 +743,7 @@ func (w *Waku) Unsubscribe(id string) error { f := w.filters.Get(id) if f != nil && w.settings.LightClient { contentFilter := filter.ContentFilter{ - Topic: string(relay.GetTopic(nil)), + Topic: relay.DefaultWakuTopic, } for _, topic := range f.Topics { contentFilter.ContentTopics = append(contentFilter.ContentTopics, common.BytesToTopic(topic).ContentTopic()) @@ -795,8 +774,7 @@ func (w *Waku) UnsubscribeMany(ids []string) error { } func (w *Waku) notEnoughPeers() bool { - topic := string(relay.GetTopic(nil)) - numPeers := len(w.node.Relay().PubSub().ListPeers(topic)) + numPeers := len(w.node.Relay().PubSub().ListPeers(relay.DefaultWakuTopic)) return numPeers <= w.settings.MinPeersForRelay } @@ -813,10 +791,10 @@ func (w *Waku) broadcast() { if w.settings.LightClient || w.notEnoughPeers() { log.Debug("publishing message via lightpush", zap.Any("hash", hexutil.Encode(hash))) - _, err = w.node.Lightpush().Publish(context.Background(), msg, nil) + _, err = w.node.Lightpush().Publish(context.Background(), msg) } else { log.Debug("publishing message via relay", zap.Any("hash", hexutil.Encode(hash))) - _, err = w.node.Relay().Publish(context.Background(), msg, nil) + _, err = w.node.Relay().Publish(context.Background(), msg) } if err != nil { @@ -856,7 +834,7 @@ func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) { _, alreadyCached := w.envelopes[gethcommon.BytesToHash(hash)] w.poolMu.Unlock() if !alreadyCached { - envelope := wakuprotocol.NewEnvelope(msg, string(relay.GetTopic(nil))) + envelope := wakuprotocol.NewEnvelope(msg, relay.DefaultWakuTopic) recvMessage := common.NewReceivedMessage(envelope) w.postEvent(recvMessage) // notify the local node about the new message w.addEnvelope(recvMessage) @@ -875,7 +853,7 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s StartTime: float64(from), EndTime: float64(to), ContentTopics: strTopics, - Topic: string(relay.DefaultWakuTopic), + Topic: relay.DefaultWakuTopic, } ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) @@ -887,7 +865,7 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s } for _, msg := range result.Messages { - envelope := wakuprotocol.NewEnvelope(msg, string(relay.DefaultWakuTopic)) + envelope := wakuprotocol.NewEnvelope(msg, relay.DefaultWakuTopic) w.logger.Debug("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash()))) _, err = w.OnNewEnvelopes(envelope) if err != nil { @@ -1035,7 +1013,24 @@ func (w *Waku) PeerCount() int { } func (w *Waku) Peers() map[string][]string { - return FormatPeerStats(w.node.Peers()) + return FormatPeerStats(w.node.PeerStats()) +} + +func (w *Waku) StartDiscV5() error { + if w.node.DiscV5() == nil { + return errors.New("discv5 is not setup") + } + + return w.node.DiscV5().Start() +} + +func (w *Waku) StopDiscV5() error { + if w.node.DiscV5() == nil { + return errors.New("discv5 is not setup") + } + + w.node.DiscV5().Stop() + return nil } func (w *Waku) AddStorePeer(address string) (string, error) {