feat: Add DiscV5 to Waku2 (#2434)

* feat: Add DiscV5 to Waku2
* fix: null verification for discV5
This commit is contained in:
Richard Ramos 2021-11-22 09:40:14 -04:00 committed by GitHub
parent 1d752c087f
commit d32f0467ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1043 additions and 214 deletions

View File

@ -43,6 +43,16 @@ func (w *gethWakuWrapper) PeerCount() int {
return -1
}
// Added for compatibility with waku V2
func (w *gethWakuWrapper) StartDiscV5() error {
return errors.New("not available in WakuV1")
}
// Added for compatibility with waku V2
func (w *gethWakuWrapper) StopDiscV5() error {
return errors.New("not available in WakuV1")
}
// PeerCount function only added for compatibility with waku V2
func (w *gethWakuWrapper) AddStorePeer(address string) (string, error) {
return "", errors.New("not available in WakuV1")

View File

@ -222,6 +222,14 @@ func (w *gethWakuV2Wrapper) RequestHistoricMessagesWithTimeout(peerID []byte, en
return errors.New("DEPRECATED")
}
func (w *gethWakuV2Wrapper) StartDiscV5() error {
return w.waku.StartDiscV5()
}
func (w *gethWakuV2Wrapper) StopDiscV5() error {
return w.waku.StopDiscV5()
}
func (w *gethWakuV2Wrapper) AddStorePeer(address string) (string, error) {
return w.waku.AddStorePeer(address)
}

View File

@ -18,6 +18,10 @@ type Waku interface {
Peers() map[string][]string
StartDiscV5() error
StopDiscV5() error
AddStorePeer(address string) (string, error)
AddRelayPeer(address string) (string, error)

2
go.mod
View File

@ -49,7 +49,7 @@ require (
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1
github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432
github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6
github.com/status-im/migrate/v4 v4.6.2-status.2

4
go.sum
View File

@ -1207,8 +1207,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS
github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU=
github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1 h1:SlnFFjgrrtI2XKRWWa2ZQNqJ1qJ2/X0fYVKPoBI2c5Q=
github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8=
github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5 h1:ur43GiEbW0iI+n+Iql3i1+wvgKRun/J10YcEsx985X0=
github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=

View File

@ -259,6 +259,8 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku,
PeerExchange: nodeConfig.WakuV2Config.PeerExchange,
DiscoveryLimit: nodeConfig.WakuV2Config.DiscoveryLimit,
PersistPeers: nodeConfig.WakuV2Config.PersistPeers,
DiscV5BootstrapNodes: nodeConfig.ClusterConfig.DiscV5BootstrapNodes,
EnableDiscV5: nodeConfig.WakuV2Config.EnableDiscV5,
}
if nodeConfig.WakuV2Config.MaxMessageSize > 0 {

View File

@ -238,6 +238,15 @@ type WakuV2Config struct {
// PeerExchange determines whether GossipSub Peer Exchange is enabled or not
PeerExchange bool
// EnableDiscV5 indicates if DiscoveryV5 is enabled or not
EnableDiscV5 bool
// UDPPort number to start discovery v5
UDPPort int
// AutoUpdate instructs the node to update their own ip address and port with the values seen by other nodes
AutoUpdate bool
}
// ----------
@ -301,6 +310,9 @@ type ClusterConfig struct {
// WakuRendezvousNodes is a list of go-waku rendezvous nodes to be used for ambient discovery
WakuRendezvousNodes []string
// DiscV5Nodes is a list of enr to be used for ambient discovery
DiscV5BootstrapNodes []string
}
// String dumps config object as nicely indented JSON

View File

@ -0,0 +1,9 @@
package protocol
func (m *Messenger) StartDiscV5() error {
return m.transport.StartDiscV5()
}
func (m *Messenger) StopDiscV5() error {
return m.transport.StopDiscV5()
}

View File

@ -603,6 +603,14 @@ func PubkeyToHex(key *ecdsa.PublicKey) string {
return types.EncodeHex(crypto.FromECDSAPub(key))
}
func (t *Transport) StartDiscV5() error {
return t.waku.StartDiscV5()
}
func (t *Transport) StopDiscV5() error {
return t.waku.StopDiscV5()
}
func (t *Transport) AddStorePeer(address string) (string, error) {
return t.waku.AddStorePeer(address)
}

View File

@ -921,6 +921,14 @@ func (api *PublicAPI) BloomFilter() string {
return hexutil.Encode(api.service.messenger.BloomFilter())
}
func (api *PublicAPI) StartDiscV5() error {
return api.service.messenger.StartDiscV5()
}
func (api *PublicAPI) StopDiscV5() error {
return api.service.messenger.StopDiscV5()
}
func (api *PublicAPI) AddStorePeer(address string) (string, error) {
return api.service.messenger.AddStorePeer(address)
}

View File

@ -0,0 +1,453 @@
package discv5
import (
"context"
"crypto/ecdsa"
"fmt"
"math"
"math/rand"
"net"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/status-im/go-waku/waku/v2/utils"
)
var log = logging.Logger("waku_discv5")
type DiscoveryV5 struct {
sync.Mutex
discovery.Discovery
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
peerCache peerCache
}
type peerCache struct {
sync.RWMutex
recs map[peer.ID]peerRecord
rng *rand.Rand
}
type peerRecord struct {
expire int64
peer peer.AddrInfo
}
type discV5Parameters struct {
autoUpdate bool
bootnodes []*enode.Node
udpPort int
tcpPort int
advertiseAddr *net.IP
}
const WakuENRField = "waku2"
// WakuEnrBitfield is a8-bit flag field to indicate Waku capabilities. Only the 4 LSBs are currently defined according to RFC31 (https://rfc.vac.dev/spec/31/).
type WakuEnrBitfield = uint8
type DiscoveryV5Option func(*discV5Parameters)
func WithAutoUpdate(autoUpdate bool) DiscoveryV5Option {
return func(params *discV5Parameters) {
params.autoUpdate = autoUpdate
}
}
func WithBootnodes(bootnodes []*enode.Node) DiscoveryV5Option {
return func(params *discV5Parameters) {
params.bootnodes = bootnodes
}
}
func WithAdvertiseAddr(addr net.IP) DiscoveryV5Option {
return func(params *discV5Parameters) {
params.advertiseAddr = &addr
}
}
func WithUDPPort(port int) DiscoveryV5Option {
return func(params *discV5Parameters) {
params.udpPort = port
}
}
func DefaultOptions() []DiscoveryV5Option {
return []DiscoveryV5Option{
WithUDPPort(9000),
}
}
func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield {
var v uint8 = 0
if lightpush {
v |= (1 << 3)
}
if filter {
v |= (1 << 2)
}
if store {
v |= (1 << 1)
}
if relay {
v |= (1 << 0)
}
return v
}
func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.PrivateKey, wakuFlags WakuEnrBitfield, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
params := new(discV5Parameters)
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
params.tcpPort = tcpPort
localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddr)
if err != nil {
return nil, err
}
return &DiscoveryV5{
host: host,
params: params,
peerCache: peerCache{
rng: rand.New(rand.NewSource(rand.Int63())),
recs: make(map[peer.ID]peerRecord),
},
localnode: localnode,
config: discover.Config{
PrivateKey: priv,
Bootnodes: params.bootnodes,
},
udpAddr: &net.UDPAddr{
IP: net.IPv4zero,
Port: params.udpPort,
},
}, nil
}
func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort int, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP) (*enode.LocalNode, error) {
db, err := enode.OpenDB("")
if err != nil {
return nil, err
}
localnode := enode.NewLocalNode(db, priv)
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
localnode.SetFallbackUDP(udpPort)
localnode.Set(enr.WithEntry(WakuENRField, wakuFlags))
localnode.Set(enr.IP(ipAddr))
if udpPort > 0 && udpPort <= math.MaxUint16 {
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("could not set udpPort ", udpPort)
}
if tcpPort > 0 && tcpPort <= math.MaxUint16 {
localnode.Set(enr.TCP(uint16(tcpPort))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("could not set tcpPort ", tcpPort)
}
if advertiseAddr != nil {
localnode.SetStaticIP(*advertiseAddr)
}
return localnode, nil
}
func (d *DiscoveryV5) listen() error {
conn, err := net.ListenUDP("udp", d.udpAddr)
if err != nil {
return err
}
listener, err := discover.ListenV5(conn, d.localnode, d.config)
if err != nil {
return err
}
d.listener = listener
return nil
}
func (d *DiscoveryV5) Start() error {
d.Lock()
defer d.Unlock()
err := d.listen()
if err != nil {
return err
}
log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.params.tcpPort))
log.Info("Discovery V5 ", d.localnode.Node())
return nil
}
func (d *DiscoveryV5) Stop() {
d.Lock()
defer d.Unlock()
d.listener.Close()
d.listener = nil
log.Info("Stopped Discovery V5")
}
// IsPrivate reports whether ip is a private address, according to
// RFC 1918 (IPv4 addresses) and RFC 4193 (IPv6 addresses).
// Copied/Adapted from https://go-review.googlesource.com/c/go/+/272668/11/src/net/ip.go
// Copyright (c) The Go Authors. All rights reserved.
// @TODO: once Go 1.17 is released in Q42021, remove this function as it will become part of the language
func IsPrivate(ip net.IP) bool {
if ip4 := ip.To4(); ip4 != nil {
// Following RFC 4193, Section 3. Local IPv6 Unicast Addresses which says:
// The Internet Assigned Numbers Authority (IANA) has reserved the
// following three blocks of the IPv4 address space for private internets:
// 10.0.0.0 - 10.255.255.255 (10/8 prefix)
// 172.16.0.0 - 172.31.255.255 (172.16/12 prefix)
// 192.168.0.0 - 192.168.255.255 (192.168/16 prefix)
return ip4[0] == 10 ||
(ip4[0] == 172 && ip4[1]&0xf0 == 16) ||
(ip4[0] == 192 && ip4[1] == 168)
}
// Following RFC 4193, Section 3. Private Address Space which says:
// The Internet Assigned Numbers Authority (IANA) has reserved the
// following block of the IPv6 address space for local internets:
// FC00:: - FDFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF (FC00::/7 prefix)
return len(ip) == net.IPv6len && ip[0]&0xfe == 0xfc
}
func (d *DiscoveryV5) UpdateAddr(addr net.IP) error {
if !d.params.autoUpdate {
return nil
}
d.Lock()
defer d.Unlock()
if addr.IsUnspecified() || d.localnode.Node().IP().Equal(addr) {
return nil
}
// TODO: improve this logic to determine if an address should be replaced or not
if !(d.localnode.Node().IP().IsLoopback() && IsPrivate(addr)) && !(IsPrivate(d.localnode.Node().IP()) && !addr.IsLoopback() && !IsPrivate(addr)) {
return nil
}
d.localnode.Set(enr.IP(addr))
log.Info(fmt.Sprintf("Updated Discovery V5 node IP: %s", d.localnode.Node().IP()))
log.Info("Discovery V5 ", d.localnode.Node())
return nil
}
func isWakuNode(node *enode.Node) bool {
enrField := new(WakuEnrBitfield)
if err := node.Record().Load(enr.WithEntry(WakuENRField, &enrField)); err != nil {
if !enr.IsNotFound(err) {
log.Error("could not retrieve port for enr ", node)
}
return false
}
if enrField != nil {
return *enrField != uint8(0)
}
return false
}
func hasTCPPort(node *enode.Node) bool {
enrTCP := new(enr.TCP)
if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil {
if !enr.IsNotFound(err) {
log.Error("could not retrieve port for enr ", node)
}
return false
}
return true
}
func (d *DiscoveryV5) evaluateNode(node *enode.Node) bool {
if node == nil || node.IP() == nil {
return false
}
if !isWakuNode(node) || !hasTCPPort(node) {
return false
}
_, err := utils.EnodeToPeerInfo(node)
if err != nil {
log.Error("could not obtain peer info from enode:", err)
return false
}
return true
}
func (c *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
// Get options
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return 0, err
}
// TODO: once discv5 spec introduces capability and topic discovery, implement this function
return 20 * time.Minute, nil
}
func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int, doneCh chan struct{}) {
for {
if len(d.peerCache.recs) >= limit {
break
}
if ctx.Err() != nil {
break
}
exists := iterator.Next()
if !exists {
break
}
address, err := utils.EnodeToMultiAddr(iterator.Node())
if err != nil {
log.Error(err)
continue
}
peerInfo, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
log.Error(err)
continue
}
d.peerCache.recs[peerInfo.ID] = peerRecord{
expire: time.Now().Unix() + 3600, // Expires in 1hr
peer: *peerInfo,
}
}
close(doneCh)
}
func (d *DiscoveryV5) removeExpiredPeers() int {
// Remove all expired entries from cache
currentTime := time.Now().Unix()
newCacheSize := len(d.peerCache.recs)
for p := range d.peerCache.recs {
rec := d.peerCache.recs[p]
if rec.expire < currentTime {
newCacheSize--
delete(d.peerCache.recs, p)
}
}
return newCacheSize
}
func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
// Get options
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return nil, err
}
const maxLimit = 100
limit := options.Limit
if limit == 0 || limit > maxLimit {
limit = maxLimit
}
// We are ignoring the topic. Future versions might use a map[string]*peerCache instead where the string represents the pubsub topic
d.peerCache.Lock()
defer d.peerCache.Unlock()
cacheSize := d.removeExpiredPeers()
// Discover new records if we don't have enough
if cacheSize < limit && d.listener != nil {
d.Lock()
iterator := d.listener.RandomNodes()
iterator = enode.Filter(iterator, d.evaluateNode)
defer iterator.Close()
doneCh := make(chan struct{})
go d.iterate(ctx, iterator, limit, doneCh)
select {
case <-ctx.Done():
case <-doneCh:
}
d.Unlock()
}
// Randomize and fill channel with available records
count := len(d.peerCache.recs)
if limit < count {
count = limit
}
chPeer := make(chan peer.AddrInfo, count)
perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count]
permSet := make(map[int]int)
for i, v := range perm {
permSet[v] = i
}
sendLst := make([]*peer.AddrInfo, count)
iter := 0
for k := range d.peerCache.recs {
if sendIndex, ok := permSet[iter]; ok {
peerInfo := d.peerCache.recs[k].peer
sendLst[sendIndex] = &peerInfo
}
iter++
}
for _, send := range sendLst {
chPeer <- *send
}
close(chPeer)
return chPeer, err
}

View File

@ -1,12 +1,10 @@
package discovery
package dnsdisc
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/status-im/go-waku/waku/v2/utils"
ma "github.com/multiformats/go-multiaddr"
)
@ -44,7 +42,7 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption)
}
for _, node := range tree.Nodes() {
m, err := EnodeToMultiAddr(node)
m, err := utils.EnodeToMultiAddr(node)
if err != nil {
return nil, err
}
@ -54,12 +52,3 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption)
return multiAddrs, nil
}
func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()})
if err != nil {
return nil, err
}
return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID))
}

View File

@ -1,4 +1,4 @@
package discovery
package dnsdisc
import (
"context"

View File

@ -77,7 +77,7 @@ func (c ConnectionNotifier) Close() {
func (w *WakuNode) sendConnStatus() {
isOnline, hasHistory := w.Status()
if w.connStatusChan != nil {
connStatus := ConnStatus{IsOnline: isOnline, HasHistory: hasHistory, Peers: w.Peers()}
connStatus := ConnStatus{IsOnline: isOnline, HasHistory: hasHistory, Peers: w.PeerStats()}
w.connStatusChan <- connStatus
}

View File

@ -3,6 +3,8 @@ package node
import (
"context"
"fmt"
"net"
"strconv"
"time"
logging "github.com/ipfs/go-log"
@ -10,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
p2pproto "github.com/libp2p/go-libp2p-core/protocol"
@ -20,6 +23,7 @@ import (
rendezvous "github.com/status-im/go-waku-rendezvous"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/discv5"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
@ -32,6 +36,13 @@ var log = logging.Logger("wakunode")
type Message []byte
type Peer struct {
ID peer.ID
Protocols []string
Addrs []ma.Multiaddr
Connected bool
}
type WakuNode struct {
host host.Host
opts *WakuNodeParameters
@ -42,11 +53,16 @@ type WakuNode struct {
rendezvous *rendezvous.RendezvousService
store *store.WakuStore
addrChan chan ma.Multiaddr
discoveryV5 *discv5.DiscoveryV5
bcaster v2.Broadcaster
connectionNotif ConnectionNotifier
protocolEventSub event.Subscription
identificationEventSub event.Subscription
addressChangesSub event.Subscription
ctx context.Context
cancel context.CancelFunc
@ -64,6 +80,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
params.libP2POpts = DefaultLibP2POptions
opts = append(DefaultWakuNodeOptions, opts...)
for _, opt := range opts {
err := opt(params)
if err != nil {
@ -72,6 +89,14 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
}
}
// Setting default host address if none was provided
if params.hostAddr == nil {
err := WithHostAddress(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})(params)
if err != nil {
cancel()
return nil, err
}
}
if len(params.multiAddr) > 0 {
params.libP2POpts = append(params.libP2POpts, libp2p.ListenAddrs(params.multiAddr...))
}
@ -97,6 +122,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.ctx = ctx
w.opts = params
w.quit = make(chan struct{})
w.addrChan = make(chan ma.Multiaddr, 1024)
if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil {
return nil, err
@ -106,6 +132,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
return nil, err
}
if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil {
return nil, err
}
if params.connStatusC != nil {
w.connStatusChan = params.connStatusC
}
@ -119,13 +149,83 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.startKeepAlive(w.opts.keepAliveInterval)
}
for _, addr := range w.ListenAddresses() {
log.Info("Listening on ", addr)
}
go w.checkForAddressChanges()
go w.onAddrChange()
return w, nil
}
func (w *WakuNode) onAddrChange() {
for m := range w.addrChan {
ipStr, err := m.ValueForProtocol(ma.P_IP4)
if err != nil {
log.Error(fmt.Sprintf("could not extract ip from ma %s: %s", m, err.Error()))
continue
}
ip := net.ParseIP(ipStr)
if !ip.IsLoopback() && !ip.IsUnspecified() {
if w.opts.enableDiscV5 {
err := w.discoveryV5.UpdateAddr(ip)
if err != nil {
log.Error(fmt.Sprintf("could not update DiscV5 address with IP %s: %s", ip, err.Error()))
continue
}
}
}
}
}
func (w *WakuNode) logAddress(addr ma.Multiaddr) {
log.Info("Listening on ", addr)
// TODO: make this optional depending on DNS Disc being enabled
if w.opts.privKey != nil {
enr, ip, err := utils.GetENRandIP(addr, w.opts.privKey)
if err != nil {
log.Error("could not obtain ENR record from multiaddress", err)
} else {
log.Info(fmt.Sprintf("ENR for IP %s: %s", ip, enr))
}
}
}
func (w *WakuNode) checkForAddressChanges() {
addrs := w.ListenAddresses()
first := make(chan struct{}, 1)
first <- struct{}{}
for {
select {
case <-w.quit:
return
case <-first:
for _, addr := range addrs {
w.logAddress(addr)
}
case <-w.addressChangesSub.Out():
newAddrs := w.ListenAddresses()
print := false
if len(addrs) != len(newAddrs) {
print = true
} else {
for i := range newAddrs {
if addrs[i].String() != newAddrs[i].String() {
print = true
break
}
}
}
if print {
addrs = newAddrs
log.Warn("Change in host multiaddresses")
for _, addr := range newAddrs {
w.addrChan <- addr
w.logAddress(addr)
}
}
}
}
}
func (w *WakuNode) Start() error {
w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
if w.opts.enableStore {
@ -141,6 +241,17 @@ func (w *WakuNode) Start() error {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...))
}
if w.opts.enableDiscV5 {
err := w.mountDiscV5()
if err != nil {
return err
}
}
if w.opts.enableDiscV5 {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...))
}
err := w.mountRelay(w.opts.wOpts...)
if err != nil {
return err
@ -178,12 +289,14 @@ func (w *WakuNode) Stop() {
defer w.cancel()
close(w.quit)
close(w.addrChan)
w.bcaster.Close()
defer w.connectionNotif.Close()
defer w.protocolEventSub.Close()
defer w.identificationEventSub.Close()
defer w.addressChangesSub.Close()
if w.rendezvous != nil {
w.rendezvous.Stop()
@ -233,6 +346,14 @@ func (w *WakuNode) Lightpush() *lightpush.WakuLightPush {
return w.lightPush
}
func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 {
return w.discoveryV5
}
func (w *WakuNode) Broadcaster() v2.Broadcaster {
return w.bcaster
}
func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, opts...)
@ -241,7 +362,7 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
}
if w.opts.enableRelay {
_, err = w.relay.Subscribe(w.ctx, nil)
_, err = w.relay.Subscribe(w.ctx)
if err != nil {
return err
}
@ -252,6 +373,41 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
return err
}
func (w *WakuNode) mountDiscV5() error {
wakuFlag := discv5.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay)
discV5Options := []discv5.DiscoveryV5Option{
discv5.WithBootnodes(w.opts.discV5bootnodes),
discv5.WithUDPPort(w.opts.udpPort),
discv5.WithAutoUpdate(w.opts.discV5autoUpdate),
}
addr := w.ListenAddresses()[0]
ipStr, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
return err
}
portStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil {
return err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return err
}
discoveryV5, err := discv5.NewDiscoveryV5(w.Host(), net.ParseIP(ipStr), port, w.opts.privKey, wakuFlag, discV5Options...)
if err != nil {
return err
}
w.discoveryV5 = discoveryV5
return nil
}
func (w *WakuNode) mountRendezvous() error {
w.rendezvous = rendezvous.NewRendezvousService(w.host, w.opts.rendevousStorage)
@ -270,14 +426,16 @@ func (w *WakuNode) startStore() {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
t := time.NewTicker(time.Second)
peerVerif:
for {
select {
case <-w.quit:
return
case <-t.C:
case <-ticker.C:
_, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta3))
if err == nil {
break peerVerif
@ -383,7 +541,7 @@ func (w *WakuNode) PeerCount() int {
return len(w.host.Network().Peers())
}
func (w *WakuNode) Peers() PeerStats {
func (w *WakuNode) PeerStats() PeerStats {
p := make(PeerStats)
for _, peerID := range w.host.Network().Peers() {
protocols, err := w.host.Peerstore().GetProtocols(peerID)
@ -395,6 +553,26 @@ func (w *WakuNode) Peers() PeerStats {
return p
}
func (w *WakuNode) Peers() ([]*Peer, error) {
var peers []*Peer
for _, peerId := range w.host.Peerstore().Peers() {
connected := w.host.Network().Connectedness(peerId) == network.Connected
protocols, err := w.host.Peerstore().GetProtocols(peerId)
if err != nil {
return nil, err
}
addrs := w.host.Peerstore().Addrs(peerId)
peers = append(peers, &Peer{
ID: peerId,
Protocols: protocols,
Connected: connected,
Addrs: addrs,
})
}
return peers, nil
}
// startKeepAlive creates a go routine that periodically pings connected peers.
// This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost)
@ -402,6 +580,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
log.Info("Setting up ping protocol with duration of ", t)
ticker := time.NewTicker(t)
defer ticker.Stop()
go func() {
for {
@ -419,7 +598,6 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
}
}
case <-w.quit:
ticker.Stop()
return
}
}
@ -435,9 +613,9 @@ func pingPeer(ctx context.Context, host host.Host, peer peer.ID) {
select {
case res := <-pr:
if res.Error != nil {
log.Error(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error()))
log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error()))
}
case <-ctx.Done():
log.Error(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err()))
log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err()))
}
}

View File

@ -6,6 +6,7 @@ import (
"net"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
@ -23,9 +24,11 @@ import (
const clientId string = "Go Waku v2 node"
type WakuNodeParameters struct {
hostAddr *net.TCPAddr
advertiseAddr *net.IP
multiAddr []ma.Multiaddr
addressFactory basichost.AddrsFactory
privKey *crypto.PrivKey
privKey *ecdsa.PrivateKey
libP2POpts []libp2p.Option
enableRelay bool
@ -45,6 +48,12 @@ type WakuNodeParameters struct {
rendevousStorage rendezvous.Storage
rendezvousOpts []pubsub.DiscoverOpt
enableDiscV5 bool
udpPort int
discV5bootnodes []*enode.Node
discV5Opts []pubsub.DiscoverOpt
discV5autoUpdate bool
keepAliveInterval time.Duration
enableLightPush bool
@ -54,6 +63,11 @@ type WakuNodeParameters struct {
type WakuNodeOption func(*WakuNodeParameters) error
// Default options used in the libp2p node
var DefaultWakuNodeOptions = []WakuNodeOption{
WithWakuRelay(),
}
// MultiAddresses return the list of multiaddresses configured in the node
func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr {
return w.multiAddr
@ -61,39 +75,43 @@ func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr {
// Identity returns a libp2p option containing the identity used by the node
func (w WakuNodeParameters) Identity() config.Option {
return libp2p.Identity(*w.privKey)
return libp2p.Identity(*w.GetPrivKey())
}
// WithHostAddress is a WakuNodeOption that configures libp2p to listen on a list of net endpoint addresses
func WithHostAddress(hostAddr []*net.TCPAddr) WakuNodeOption {
return func(params *WakuNodeParameters) error {
var multiAddresses []ma.Multiaddr
for _, addr := range hostAddr {
hostAddrMA, err := manet.FromNetAddr(addr)
if err != nil {
return err
}
multiAddresses = append(multiAddresses, hostAddrMA)
}
func (w WakuNodeParameters) AddressFactory() basichost.AddrsFactory {
return w.addressFactory
}
params.multiAddr = append(params.multiAddr, multiAddresses...)
// WithHostAddress is a WakuNodeOption that configures libp2p to listen on a specific address
func WithHostAddress(hostAddr *net.TCPAddr) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.hostAddr = hostAddr
hostAddrMA, err := manet.FromNetAddr(hostAddr)
if err != nil {
return err
}
params.multiAddr = append(params.multiAddr, hostAddrMA)
return nil
}
}
// WithAdvertiseAddress is a WakuNodeOption that allows overriding the addresses used in the waku node with custom values
func WithAdvertiseAddress(addressesToAdvertise []*net.TCPAddr, enableWS bool, wsPort int) WakuNodeOption {
// WithAdvertiseAddress is a WakuNodeOption that allows overriding the address used in the waku node with custom value
func WithAdvertiseAddress(address *net.TCPAddr, enableWS bool, wsPort int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.advertiseAddr = &address.IP
advertiseAddress, err := manet.FromNetAddr(address)
if err != nil {
return err
}
params.addressFactory = func([]ma.Multiaddr) []ma.Multiaddr {
var result []multiaddr.Multiaddr
for _, adv := range addressesToAdvertise {
addr, _ := manet.FromNetAddr(adv)
result = append(result, addr)
if enableWS {
wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", adv.IP.String(), wsPort))
result = append(result, wsMa)
}
result = append(result, advertiseAddress)
if enableWS {
wsMa, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/ws", address, wsPort))
result = append(result, wsMa)
}
return result
}
@ -112,12 +130,16 @@ func WithMultiaddress(addresses []ma.Multiaddr) WakuNodeOption {
// WithPrivateKey is used to set an ECDSA private key in a libp2p node
func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption {
return func(params *WakuNodeParameters) error {
privk := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey))
params.privKey = &privk
params.privKey = privKey
return nil
}
}
func (w *WakuNodeParameters) GetPrivKey() *crypto.PrivKey {
privKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(w.privKey))
return &privKey
}
// WithLibP2POptions is a WakuNodeOption used to configure the libp2p node.
// This can potentially override any libp2p config that was set with other
// WakuNodeOption
@ -138,6 +160,18 @@ func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {
}
}
// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery
func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableDiscV5 = true
params.udpPort = udpPort
params.discV5bootnodes = bootnodes
params.discV5Opts = discoverOpts
params.discV5autoUpdate = autoUpdate
return nil
}
}
// WithRendezvous is a WakuOption used to enable go-waku-rendezvous discovery.
// It accepts an optional list of DiscoveryOpt options
func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
@ -232,7 +266,6 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption {
var DefaultLibP2POptions = []libp2p.Option{
libp2p.DefaultTransports,
libp2p.UserAgent(clientId),
libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts.
libp2p.EnableNATService(), // TODO: is this needed?)
libp2p.ConnectionManager(connmgr.NewConnManager(200, 300, 0)),
}

View File

@ -15,7 +15,6 @@ import (
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
@ -27,13 +26,6 @@ var (
)
type (
FilterSubscribeParameters struct {
host host.Host
selectedPeer peer.ID
}
FilterSubscribeOption func(*FilterSubscribeParameters)
Filter struct {
PeerID peer.ID
Topic string
@ -65,41 +57,32 @@ type (
// NOTE This is just a start, the design of this protocol isn't done yet. It
// should be direct payload exchange (a la req-resp), not be coupled with the
// relay protocol.
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
func WithPeer(p peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.selectedPeer = p
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter {
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil {
log.Error(err)
}
}
func WithAutomaticPeerSelection() FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
wf := new(WakuFilter)
wf.ctx = ctx
wf.MsgC = make(chan *protocol.Envelope)
wf.h = host
wf.isFullNode = isFullNode
wf.filters = NewFilterMap()
wf.subscribers = NewSubscribers()
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
go wf.FilterListener()
func DefaultOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
WithAutomaticPeerSelection(),
if wf.isFullNode {
log.Info("Filter protocol started")
} else {
log.Info("Filter protocol started (only client mode)")
}
return wf
}
func (wf *WakuFilter) onRequest(s network.Stream) {
@ -148,32 +131,6 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
}
}
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter {
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil {
log.Error(err)
}
wf := new(WakuFilter)
wf.ctx = ctx
wf.MsgC = make(chan *protocol.Envelope)
wf.h = host
wf.isFullNode = isFullNode
wf.filters = NewFilterMap()
wf.subscribers = NewSubscribers()
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
go wf.FilterListener()
if wf.isFullNode {
log.Info("Filter protocol started")
} else {
log.Info("Filter protocol started (only client mode)")
}
return wf
}
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}

View File

@ -0,0 +1,52 @@
package filter
import (
"context"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/status-im/go-waku/waku/v2/utils"
)
type (
FilterSubscribeParameters struct {
host host.Host
selectedPeer peer.ID
}
FilterSubscribeOption func(*FilterSubscribeParameters)
)
func WithPeer(p peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.selectedPeer = p
}
}
func WithAutomaticPeerSelection() FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func DefaultOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
WithAutomaticPeerSelection(),
}
}

View File

@ -77,13 +77,13 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
log.Info("lightpush push request")
response := new(pb.PushResponse)
if !wakuLP.IsClientOnly() {
pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic)
pubSubTopic := requestPushRPC.Query.PubsubTopic
message := requestPushRPC.Query.Message
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
_, err := wakuLP.relay.Publish(wakuLP.ctx, message, &pubSubTopic)
_, err := wakuLP.relay.PublishToTopic(wakuLP.ctx, message, pubSubTopic)
if err != nil {
response.IsSuccess = false
@ -181,14 +181,14 @@ func (wakuLP *WakuLightPush) Stop() {
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
}
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) {
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string, opts ...LightPushOption) ([]byte, error) {
if message == nil {
return nil, errors.New("message can't be null")
}
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = string(relay.GetTopic(topic))
req.PubsubTopic = topic
response, err := wakuLP.request(ctx, req, opts...)
if err != nil {
@ -202,3 +202,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessag
return nil, errors.New(response.Info)
}
}
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, opts ...LightPushOption) ([]byte, error) {
return wakuLP.PublishToTopic(ctx, message, relay.DefaultWakuTopic, opts...)
}

View File

@ -24,11 +24,9 @@ import (
var log = logging.Logger("wakurelay")
type Topic string
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
var DefaultWakuTopic Topic = Topic(waku_proto.DefaultPubsubTopic().String())
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
type WakuRelay struct {
host host.Host
@ -37,13 +35,12 @@ type WakuRelay struct {
bcaster v2.Broadcaster
// TODO: convert to concurrent maps
topics map[Topic]struct{}
topicsMutex sync.Mutex
wakuRelayTopics map[Topic]*pubsub.Topic
relaySubs map[Topic]*pubsub.Subscription
wakuRelayTopics map[string]*pubsub.Topic
relaySubs map[string]*pubsub.Subscription
// TODO: convert to concurrent maps
subscriptions map[Topic][]*Subscription
subscriptions map[string][]*Subscription
subscriptionsMutex sync.Mutex
}
@ -56,10 +53,9 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) {
w := new(WakuRelay)
w.host = h
w.topics = make(map[Topic]struct{})
w.wakuRelayTopics = make(map[Topic]*pubsub.Topic)
w.relaySubs = make(map[Topic]*pubsub.Subscription)
w.subscriptions = make(map[Topic][]*Subscription)
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
w.relaySubs = make(map[string]*pubsub.Subscription)
w.subscriptions = make(map[string][]*Subscription)
w.bcaster = bcaster
// default options required by WakuRelay
@ -96,12 +92,12 @@ func (w *WakuRelay) PubSub() *pubsub.PubSub {
return w.pubsub
}
func (w *WakuRelay) Topics() []Topic {
func (w *WakuRelay) Topics() []string {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
var result []Topic
for topic := range w.topics {
var result []string
for topic := range w.relaySubs {
result = append(result, topic)
}
return result
@ -111,11 +107,10 @@ func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
w.pubsub = pubSub
}
func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
w.topics[topic] = struct{}{}
pubSubTopic, ok := w.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
newTopic, err := w.pubsub.Join(string(topic))
@ -128,7 +123,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
return pubSubTopic, nil
}
func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error) {
func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) {
sub, ok := w.relaySubs[topic]
if !ok {
pubSubTopic, err := w.upsertTopic(topic)
@ -148,7 +143,7 @@ func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error
return sub, nil
}
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) {
func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) {
// Publish a `WakuMessage` to a PubSub topic.
if w.pubsub == nil {
return nil, errors.New("PubSub hasn't been set")
@ -158,7 +153,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic
return nil, errors.New("message can't be null")
}
pubSubTopic, err := w.upsertTopic(GetTopic(topic))
pubSubTopic, err := w.upsertTopic(topic)
if err != nil {
return nil, err
@ -179,12 +174,8 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic
return hash, nil
}
func GetTopic(topic *Topic) Topic {
var t Topic = DefaultWakuTopic
if topic != nil {
t = *topic
}
return t
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) {
return w.PublishToTopic(ctx, message, DefaultWakuTopic)
}
func (w *WakuRelay) Stop() {
@ -200,11 +191,10 @@ func (w *WakuRelay) Stop() {
w.subscriptions = nil
}
func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, error) {
func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) {
// Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage.
t := GetTopic(topic)
sub, err := w.subscribe(t)
sub, err := w.subscribe(topic)
if err != nil {
return nil, err
@ -219,23 +209,26 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription,
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
w.subscriptions[t] = append(w.subscriptions[t], subscription)
w.subscriptions[topic] = append(w.subscriptions[topic], subscription)
if w.bcaster != nil {
w.bcaster.Register(subscription.C)
}
go w.subscribeToTopic(t, subscription, sub)
go w.subscribeToTopic(topic, subscription, sub)
return subscription, nil
}
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic Topic) error {
if _, ok := w.topics[topic]; !ok {
func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) {
return w.SubscribeToTopic(ctx, DefaultWakuTopic)
}
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
if _, ok := w.relaySubs[topic]; !ok {
return fmt.Errorf("topics %s is not subscribed", (string)(topic))
}
log.Info("Unsubscribing from topic ", topic)
delete(w.topics, topic)
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
@ -268,7 +261,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
log.Error(fmt.Errorf("subscription failed: %w", err))
sub.Cancel()
close(msgChannel)
for _, subscription := range w.subscriptions[Topic(sub.Topic())] {
for _, subscription := range w.subscriptions[sub.Topic()] {
subscription.Unsubscribe()
}
}
@ -279,7 +272,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
return msgChannel
}
func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *pubsub.Subscription) {
func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) {
ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay"))
if err != nil {
log.Error(err)

View File

@ -74,12 +74,15 @@ func (self *MessageQueue) cleanOlderRecords() {
func (self *MessageQueue) checkForOlderRecords(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()
select {
case <-self.quit:
return
case <-ticker.C:
self.cleanOlderRecords()
for {
select {
case <-self.quit:
return
case <-ticker.C:
self.cleanOlderRecords()
}
}
}

View File

@ -2,11 +2,20 @@ package utils
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"math"
"math/rand"
"net"
"strconv"
"sync"
"time"
ma "github.com/multiformats/go-multiaddr"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
@ -112,3 +121,62 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str
return nil, ErrNoPeersAvailable
}
}
func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()})
if err != nil {
return nil, err
}
return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID))
}
func EnodeToPeerInfo(node *enode.Node) (*peer.AddrInfo, error) {
address, err := EnodeToMultiAddr(node)
if err != nil {
return nil, err
}
return peer.AddrInfoFromP2pAddr(address)
}
func GetENRandIP(addr ma.Multiaddr, privK *ecdsa.PrivateKey) (*enode.Node, *net.TCPAddr, error) {
ip, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
return nil, nil, err
}
portStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil {
return nil, nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, nil, err
}
tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", ip, port))
if err != nil {
return nil, nil, err
}
r := &enr.Record{}
if port > 0 && port <= math.MaxUint16 {
r.Set(enr.TCP(uint16(port))) // lgtm [go/incorrect-integer-conversion]
} else {
return nil, nil, fmt.Errorf("could not set port %d", port)
}
r.Set(enr.IP(net.ParseIP(ip)))
err = enode.SignV4(r, privK)
if err != nil {
return nil, nil, err
}
node, err := enode.New(enode.ValidSchemes, r)
return node, tcpAddr, err
}

View File

@ -1,4 +1,4 @@
package discovery
package utils
import (
"crypto/ecdsa"

5
vendor/modules.txt vendored
View File

@ -447,10 +447,11 @@ github.com/spacemonkeygo/spacelog
github.com/status-im/doubleratchet
# github.com/status-im/go-multiaddr-ethv4 v1.2.1
github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/go-waku v0.0.0-20211112132622-e176975aede1
# github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5
github.com/status-im/go-waku/waku/persistence
github.com/status-im/go-waku/waku/v2
github.com/status-im/go-waku/waku/v2/discovery
github.com/status-im/go-waku/waku/v2/discv5
github.com/status-im/go-waku/waku/v2/dnsdisc
github.com/status-im/go-waku/waku/v2/metrics
github.com/status-im/go-waku/waku/v2/node
github.com/status-im/go-waku/waku/v2/protocol

View File

@ -39,7 +39,11 @@ type Config struct {
LightpushNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"`
WakuRendezvousNodes []string `toml:",omitempty"`
DiscV5BootstrapNodes []string `toml:",omitempty"`
EnableDiscV5 bool `toml:",omitempty"`
DiscoveryLimit int `toml:",omitempty"`
AutoUpdate bool `toml:",omitempty"`
UDPPort int `toml:",omitempty"`
}
var DefaultConfig = Config{
@ -49,4 +53,42 @@ var DefaultConfig = Config{
KeepAliveInterval: 10, // second
DiscoveryLimit: 40,
MinPeersForRelay: 2, // TODO: determine correct value with Vac team
UDPPort: 9000,
AutoUpdate: false,
}
func setDefaults(cfg *Config) *Config {
if cfg == nil {
cfg = new(Config)
}
if cfg.MaxMessageSize == 0 {
cfg.MaxMessageSize = DefaultConfig.MaxMessageSize
}
if cfg.Host == "" {
cfg.Host = DefaultConfig.Host
}
if cfg.Port == 0 {
cfg.Port = DefaultConfig.Port
}
if cfg.KeepAliveInterval == 0 {
cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval
}
if cfg.DiscoveryLimit == 0 {
cfg.DiscoveryLimit = DefaultConfig.DiscoveryLimit
}
if cfg.MinPeersForRelay == 0 {
cfg.MinPeersForRelay = DefaultConfig.MinPeersForRelay
}
if cfg.UDPPort == 0 {
cfg.UDPPort = DefaultConfig.UDPPort
}
return cfg
}

View File

@ -48,6 +48,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
"github.com/libp2p/go-libp2p"
@ -57,7 +58,7 @@ import (
libp2pproto "github.com/libp2p/go-libp2p-core/protocol"
rendezvous "github.com/status-im/go-waku-rendezvous"
"github.com/status-im/go-waku/waku/v2/discovery"
"github.com/status-im/go-waku/waku/v2/dnsdisc"
"github.com/status-im/go-waku/waku/v2/protocol"
wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
@ -69,7 +70,7 @@ import (
"github.com/status-im/status-go/wakuv2/common"
"github.com/status-im/status-go/wakuv2/persistence"
libp2pdisc "github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/discovery"
node "github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/store"
@ -127,38 +128,6 @@ type Waku struct {
logger *zap.Logger
}
func setDefaults(cfg *Config) *Config {
if cfg == nil {
cfg = new(Config)
}
if cfg.MaxMessageSize == 0 {
cfg.MaxMessageSize = DefaultConfig.MaxMessageSize
}
if cfg.Host == "" {
cfg.Host = DefaultConfig.Host
}
if cfg.Port == 0 {
cfg.Port = DefaultConfig.Port
}
if cfg.KeepAliveInterval == 0 {
cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval
}
if cfg.DiscoveryLimit == 0 {
cfg.DiscoveryLimit = DefaultConfig.DiscoveryLimit
}
if cfg.MinPeersForRelay == 0 {
cfg.MinPeersForRelay = DefaultConfig.MinPeersForRelay
}
return cfg
}
// New creates a WakuV2 client ready to communicate through the LibP2P network.
func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, error) {
if logger == nil {
@ -245,13 +214,25 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
opts := []node.WakuNodeOption{
node.WithLibP2POptions(libp2pOpts...),
node.WithPrivateKey(privateKey),
node.WithHostAddress([]*net.TCPAddr{hostAddr}),
node.WithHostAddress(hostAddr),
node.WithConnectionStatusChannel(connStatusChan),
node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second),
}
if cfg.Rendezvous {
opts = append(opts, node.WithRendezvous(pubsub.WithDiscoveryOpts(libp2pdisc.Limit(cfg.DiscoveryLimit))))
opts = append(opts, node.WithRendezvous(pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit))))
}
if cfg.EnableDiscV5 {
var bootnodes []*enode.Node
for _, addr := range cfg.DiscV5BootstrapNodes {
bootnode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
return nil, err
}
bootnodes = append(bootnodes, bootnode)
}
opts = append(opts, node.WithDiscoveryV5(cfg.UDPPort, bootnodes, cfg.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit))))
}
if cfg.LightClient {
@ -327,7 +308,7 @@ func (w *Waku) dnsDiscover(enrtreeAddress string, protocol libp2pproto.ID, apply
if !ok {
w.dnsAddressCacheLock.Lock()
var err error
multiaddresses, err = discovery.RetrieveNodes(ctx, enrtreeAddress)
multiaddresses, err = dnsdisc.RetrieveNodes(ctx, enrtreeAddress)
w.dnsAddressCache[enrtreeAddress] = multiaddresses
w.dnsAddressCacheLock.Unlock()
if err != nil {
@ -396,7 +377,7 @@ func (w *Waku) runRelayMsgLoop() {
return
}
sub, err := w.node.Relay().Subscribe(context.Background(), nil)
sub, err := w.node.Relay().Subscribe(context.Background())
if err != nil {
fmt.Println("Could not subscribe:", err)
return
@ -429,8 +410,6 @@ func (w *Waku) runFilterMsgLoop() {
}
func (w *Waku) subscribeWakuFilterTopic(topics [][]byte) {
pubsubTopic := relay.GetTopic(nil)
var contentTopics []string
for _, topic := range topics {
contentTopics = append(contentTopics, common.BytesToTopic(topic).ContentTopic())
@ -438,7 +417,7 @@ func (w *Waku) subscribeWakuFilterTopic(topics [][]byte) {
var err error
contentFilter := filter.ContentFilter{
Topic: string(pubsubTopic),
Topic: relay.DefaultWakuTopic,
ContentTopics: contentTopics,
}
@ -764,7 +743,7 @@ func (w *Waku) Unsubscribe(id string) error {
f := w.filters.Get(id)
if f != nil && w.settings.LightClient {
contentFilter := filter.ContentFilter{
Topic: string(relay.GetTopic(nil)),
Topic: relay.DefaultWakuTopic,
}
for _, topic := range f.Topics {
contentFilter.ContentTopics = append(contentFilter.ContentTopics, common.BytesToTopic(topic).ContentTopic())
@ -795,8 +774,7 @@ func (w *Waku) UnsubscribeMany(ids []string) error {
}
func (w *Waku) notEnoughPeers() bool {
topic := string(relay.GetTopic(nil))
numPeers := len(w.node.Relay().PubSub().ListPeers(topic))
numPeers := len(w.node.Relay().PubSub().ListPeers(relay.DefaultWakuTopic))
return numPeers <= w.settings.MinPeersForRelay
}
@ -813,10 +791,10 @@ func (w *Waku) broadcast() {
if w.settings.LightClient || w.notEnoughPeers() {
log.Debug("publishing message via lightpush", zap.Any("hash", hexutil.Encode(hash)))
_, err = w.node.Lightpush().Publish(context.Background(), msg, nil)
_, err = w.node.Lightpush().Publish(context.Background(), msg)
} else {
log.Debug("publishing message via relay", zap.Any("hash", hexutil.Encode(hash)))
_, err = w.node.Relay().Publish(context.Background(), msg, nil)
_, err = w.node.Relay().Publish(context.Background(), msg)
}
if err != nil {
@ -856,7 +834,7 @@ func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
_, alreadyCached := w.envelopes[gethcommon.BytesToHash(hash)]
w.poolMu.Unlock()
if !alreadyCached {
envelope := wakuprotocol.NewEnvelope(msg, string(relay.GetTopic(nil)))
envelope := wakuprotocol.NewEnvelope(msg, relay.DefaultWakuTopic)
recvMessage := common.NewReceivedMessage(envelope)
w.postEvent(recvMessage) // notify the local node about the new message
w.addEnvelope(recvMessage)
@ -875,7 +853,7 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s
StartTime: float64(from),
EndTime: float64(to),
ContentTopics: strTopics,
Topic: string(relay.DefaultWakuTopic),
Topic: relay.DefaultWakuTopic,
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
@ -887,7 +865,7 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s
}
for _, msg := range result.Messages {
envelope := wakuprotocol.NewEnvelope(msg, string(relay.DefaultWakuTopic))
envelope := wakuprotocol.NewEnvelope(msg, relay.DefaultWakuTopic)
w.logger.Debug("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())))
_, err = w.OnNewEnvelopes(envelope)
if err != nil {
@ -1035,7 +1013,24 @@ func (w *Waku) PeerCount() int {
}
func (w *Waku) Peers() map[string][]string {
return FormatPeerStats(w.node.Peers())
return FormatPeerStats(w.node.PeerStats())
}
func (w *Waku) StartDiscV5() error {
if w.node.DiscV5() == nil {
return errors.New("discv5 is not setup")
}
return w.node.DiscV5().Start()
}
func (w *Waku) StopDiscV5() error {
if w.node.DiscV5() == nil {
return errors.New("discv5 is not setup")
}
w.node.DiscV5().Stop()
return nil
}
func (w *Waku) AddStorePeer(address string) (string, error) {