From 6ae4d4fce275eb5da0f0967c81fe7239828c2ea3 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 16 Nov 2021 10:22:01 -0400 Subject: [PATCH] feat: discoveryV5 - part1 (#149) --- examples/chat2/main.go | 4 +- waku/node.go | 8 +- waku/v2/discv5/discover.go | 357 ++++++++++++++++++ waku/v2/discv5/discover_test.go | 95 +++++ waku/v2/{discovery => dnsdisc}/enr.go | 17 +- waku/v2/{discovery => dnsdisc}/enr_test.go | 5 +- waku/v2/{discovery => dnsdisc}/resolver.go | 2 +- .../{discovery => dnsdisc}/resolver_test.go | 2 +- waku/v2/utils/peer.go | 22 ++ waku/v2/{discovery => utils}/public_key.go | 2 +- .../{discovery => utils}/public_key_test.go | 2 +- 11 files changed, 490 insertions(+), 26 deletions(-) create mode 100644 waku/v2/discv5/discover.go create mode 100644 waku/v2/discv5/discover_test.go rename waku/v2/{discovery => dnsdisc}/enr.go (72%) rename waku/v2/{discovery => dnsdisc}/enr_test.go (88%) rename waku/v2/{discovery => dnsdisc}/resolver.go (96%) rename waku/v2/{discovery => dnsdisc}/resolver_test.go (96%) rename waku/v2/{discovery => utils}/public_key.go (99%) rename waku/v2/{discovery => utils}/public_key_test.go (98%) diff --git a/examples/chat2/main.go b/examples/chat2/main.go index 69e27f9a..91036a1b 100644 --- a/examples/chat2/main.go +++ b/examples/chat2/main.go @@ -23,7 +23,7 @@ import ( wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol" "github.com/multiformats/go-multiaddr" - "github.com/status-im/go-waku/waku/v2/discovery" + "github.com/status-im/go-waku/waku/v2/dnsdisc" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" @@ -169,7 +169,7 @@ func main() { if enableDiscovery && dnsDiscoveryUrl != "" { ui.displayMessage(fmt.Sprintf("attempting DNS discovery with %s", dnsDiscoveryUrl)) - multiaddresses, err := discovery.RetrieveNodes(ctx, dnsDiscoveryUrl, discovery.WithNameserver(dnsDiscoveryNameServer)) + multiaddresses, err := dnsdisc.RetrieveNodes(ctx, dnsDiscoveryUrl, dnsdisc.WithNameserver(dnsDiscoveryNameServer)) if err != nil { ui.displayMessage("DNS discovery error: " + err.Error()) } else { diff --git a/waku/node.go b/waku/node.go index 19fe3533..9ddfb2bb 100644 --- a/waku/node.go +++ b/waku/node.go @@ -22,7 +22,7 @@ import ( logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" - libp2pdisc "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p-core/protocol" @@ -33,7 +33,7 @@ import ( "github.com/status-im/go-waku/waku/metrics" "github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/persistence/sqlite" - "github.com/status-im/go-waku/waku/v2/discovery" + "github.com/status-im/go-waku/waku/v2/dnsdisc" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" @@ -159,7 +159,7 @@ func Execute(options Options) { } if options.Rendezvous.Enable { - nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(libp2pdisc.Limit(45), libp2pdisc.TTL(time.Duration(20)*time.Second)))) + nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second)))) } wakuNode, err := node.New(ctx, nodeOpts...) @@ -205,7 +205,7 @@ func Execute(options Options) { if options.DNSDiscovery.URL != "" { log.Info("attempting DNS discovery with ", options.DNSDiscovery.URL) - multiaddresses, err := discovery.RetrieveNodes(ctx, options.DNSDiscovery.URL, discovery.WithNameserver(options.DNSDiscovery.Nameserver)) + multiaddresses, err := dnsdisc.RetrieveNodes(ctx, options.DNSDiscovery.URL, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver)) if err != nil { log.Warn("dns discovery error ", err) } else { diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go new file mode 100644 index 00000000..27c86e89 --- /dev/null +++ b/waku/v2/discv5/discover.go @@ -0,0 +1,357 @@ +package discv5 + +import ( + "context" + "crypto/ecdsa" + "math/rand" + "net" + "sync" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/utils" +) + +var log = logging.Logger("waku_discv5") + +type DiscoveryV5 struct { + discovery.Discovery + + params *discV5Parameters + host host.Host + config discover.Config + udpAddr *net.UDPAddr + listener *discover.UDPv5 + localnode *enode.LocalNode + + peerCache peerCache +} + +type peerCache struct { + sync.RWMutex + recs map[peer.ID]peerRecord + rng *rand.Rand +} + +type peerRecord struct { + expire int64 + peer peer.AddrInfo +} + +type discV5Parameters struct { + bootnodes []*enode.Node + advertiseAddress *net.IP + udpPort int +} + +const WakuENRField = "waku2" + +// WakuEnrBitfield is a8-bit flag field to indicate Waku capabilities. Only the 4 LSBs are currently defined according to RFC31 (https://rfc.vac.dev/spec/31/). +type WakuEnrBitfield = uint8 + +type DiscoveryV5Option func(*discV5Parameters) + +func WithBootnodes(bootnodes []*enode.Node) DiscoveryV5Option { + return func(params *discV5Parameters) { + params.bootnodes = bootnodes + } +} + +func WithAdvertiseAddress(advertiseAddr net.IP) DiscoveryV5Option { + return func(params *discV5Parameters) { + params.advertiseAddress = &advertiseAddr + } +} + +func WithUDPPort(port int) DiscoveryV5Option { + return func(params *discV5Parameters) { + params.udpPort = port + } +} + +func DefaultOptions() []DiscoveryV5Option { + return []DiscoveryV5Option{ + WithUDPPort(9000), + } +} + +func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield { + var v uint8 = 0 + + if lightpush { + v |= (1 << 3) + } + + if filter { + v |= (1 << 2) + } + + if store { + v |= (1 << 1) + } + + if relay { + v |= (1 << 0) + } + + return v +} + +func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.PrivateKey, wakuFlags WakuEnrBitfield, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { + params := new(discV5Parameters) + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) + } + + localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddress) + if err != nil { + return nil, err + } + + return &DiscoveryV5{ + host: host, + params: params, + peerCache: peerCache{ + rng: rand.New(rand.NewSource(rand.Int63())), + recs: make(map[peer.ID]peerRecord), + }, + localnode: localnode, + config: discover.Config{ + PrivateKey: priv, + Bootnodes: params.bootnodes, + }, + udpAddr: &net.UDPAddr{ + IP: ipAddr, + Port: params.udpPort, + }, + }, nil +} + +func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort int, wakuFlags WakuEnrBitfield, advertiseAddr *net.IP) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + + localnode := enode.NewLocalNode(db, priv) + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + localnode.SetFallbackUDP(udpPort) + + localnode.Set(enr.WithEntry(WakuENRField, wakuFlags)) + + localnode.Set(enr.IP(ipAddr)) // Test if IP changes in p2p/enode/localnode.go ? + localnode.Set(enr.UDP(udpPort)) + localnode.Set(enr.TCP(tcpPort)) + + if advertiseAddr != nil { + localnode.SetStaticIP(*advertiseAddr) + } + + return localnode, nil +} + +func (d *DiscoveryV5) Start() error { + conn, err := net.ListenUDP("udp", d.udpAddr) + if err != nil { + return err + } + + listener, err := discover.ListenV5(conn, d.localnode, d.config) + if err != nil { + return err + } + + d.listener = listener + + log.Info("Started Discovery V5 at %s:%d", d.udpAddr.IP, d.udpAddr.Port) + + return nil +} + +func (d *DiscoveryV5) Stop() { + d.listener.Close() +} + +func isWakuNode(node *enode.Node) bool { + enrField := new(WakuEnrBitfield) + if err := node.Record().Load(enr.WithEntry(WakuENRField, &enrField)); err != nil { + if !enr.IsNotFound(err) { + log.Error("could not retrieve port for enr ", node) + } + return false + } + + if enrField != nil { + return *enrField != uint8(0) + } + + return false +} + +func hasTCPPort(node *enode.Node) bool { + enrTCP := new(enr.TCP) + if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil { + if !enr.IsNotFound(err) { + log.Error("could not retrieve port for enr ", node) + } + return false + } + + return true +} + +func (d *DiscoveryV5) evaluateNode(node *enode.Node) bool { + if node == nil || node.IP() == nil { + return false + } + + if !isWakuNode(node) || !hasTCPPort(node) { + return false + } + + _, err := utils.EnodeToPeerInfo(node) + if err != nil { + log.Error("could not obtain peer info from enode:", err) + return false + } + + return true +} + +func (c *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + // Get options + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return 0, err + } + + // TODO: once discv5 spec introduces capability and topic discovery, implement this function + + return 20 * time.Minute, nil +} + +func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan struct{}) { + for { + if len(d.peerCache.recs) >= limit { + break + } + + exists := iterator.Next() + if !exists { + break + } + + address, err := utils.EnodeToMultiAddr(iterator.Node()) + if err != nil { + log.Error(err) + continue + } + + peerInfo, err := peer.AddrInfoFromP2pAddr(address) + if err != nil { + log.Error(err) + continue + } + + d.peerCache.recs[peerInfo.ID] = peerRecord{ + expire: time.Now().Unix() + 3600, // Expires in 1hr + peer: *peerInfo, + } + } + + close(doneCh) +} + +func (d *DiscoveryV5) removeExpiredPeers() int { + // Remove all expired entries from cache + currentTime := time.Now().Unix() + newCacheSize := len(d.peerCache.recs) + + for p := range d.peerCache.recs { + rec := d.peerCache.recs[p] + if rec.expire < currentTime { + newCacheSize-- + delete(d.peerCache.recs, p) + } + } + + return newCacheSize +} + +func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + // Get options + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return nil, err + } + + const maxLimit = 100 + limit := options.Limit + if limit == 0 || limit > maxLimit { + limit = maxLimit + } + + // We are ignoring the topic. Future versions might use a map[string]*peerCache instead where the string represents the pubsub topic + + d.peerCache.Lock() + defer d.peerCache.Unlock() + + cacheSize := d.removeExpiredPeers() + + // Discover new records if we don't have enough + if cacheSize < limit { + iterator := d.listener.RandomNodes() + iterator = enode.Filter(iterator, d.evaluateNode) + defer iterator.Close() + + doneCh := make(chan struct{}) + go d.iterate(iterator, limit, doneCh) + + select { + case <-ctx.Done(): + case <-doneCh: + } + } + + // Randomize and fill channel with available records + count := len(d.peerCache.recs) + if limit < count { + count = limit + } + + chPeer := make(chan peer.AddrInfo, count) + + perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count] + permSet := make(map[int]int) + for i, v := range perm { + permSet[v] = i + } + + sendLst := make([]*peer.AddrInfo, count) + iter := 0 + for k := range d.peerCache.recs { + if sendIndex, ok := permSet[iter]; ok { + peerInfo := d.peerCache.recs[k].peer + sendLst[sendIndex] = &peerInfo + } + iter++ + } + + for _, send := range sendLst { + chPeer <- *send + } + + close(chPeer) + + return chPeer, err +} diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go new file mode 100644 index 00000000..f4d8bfa1 --- /dev/null +++ b/waku/v2/discv5/discover_test.go @@ -0,0 +1,95 @@ +package discv5 + +import ( + "context" + "crypto/ecdsa" + "fmt" + "net" + "testing" + "time" + + gcrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/tests" + "github.com/stretchr/testify/require" + + "github.com/libp2p/go-libp2p" + libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" +) + +func createHost(t *testing.T) (host.Host, int, *ecdsa.PrivateKey) { + privKey, err := gcrypto.GenerateKey() + require.NoError(t, err) + + sPrivKey := libp2pcrypto.PrivKey((*libp2pcrypto.Secp256k1PrivateKey)(privKey)) + + port, err := tests.FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + + sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + require.NoError(t, err) + + host, err := libp2p.New( + context.Background(), + libp2p.ListenAddrs(sourceMultiAddr), + libp2p.Identity(sPrivKey), + ) + require.NoError(t, err) + + return host, port, privKey +} + +func TestDiscV5(t *testing.T) { + // Host1 <-> Host2 <-> Host3 + + host1, tcpPort1, prvKey1 := createHost(t) + udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + d1, err := NewDiscoveryV5(host1, net.IPv4(127, 0, 0, 1), tcpPort1, prvKey1, NewWakuEnrBitfield(true, true, true, true), WithUDPPort(udpPort1)) + require.NoError(t, err) + + host2, tcpPort2, prvKey2 := createHost(t) + udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + d2, err := NewDiscoveryV5(host2, net.IPv4(127, 0, 0, 1), tcpPort2, prvKey2, NewWakuEnrBitfield(true, true, true, true), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + require.NoError(t, err) + + host3, tcpPort3, prvKey3 := createHost(t) + udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + d3, err := NewDiscoveryV5(host3, net.IPv4(127, 0, 0, 1), tcpPort3, prvKey3, NewWakuEnrBitfield(true, true, true, true), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + require.NoError(t, err) + + err = d1.Start() + require.NoError(t, err) + + err = d2.Start() + require.NoError(t, err) + + err = d3.Start() + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + peerChan, err := d1.FindPeers(ctx, "", discovery.Limit(2)) + require.NoError(t, err) + + foundHost2 := false + foundHost3 := false + for p := range peerChan { + if p.Addrs[0].String() == host2.Addrs()[0].String() { + foundHost2 = true + } + + if p.Addrs[0].String() == host3.Addrs()[0].String() { + foundHost3 = true + + } + } + + require.True(t, foundHost2 && foundHost3) +} diff --git a/waku/v2/discovery/enr.go b/waku/v2/dnsdisc/enr.go similarity index 72% rename from waku/v2/discovery/enr.go rename to waku/v2/dnsdisc/enr.go index 3fccba24..0d9212a9 100644 --- a/waku/v2/discovery/enr.go +++ b/waku/v2/dnsdisc/enr.go @@ -1,12 +1,10 @@ -package discovery +package dnsdisc import ( "context" - "fmt" "github.com/ethereum/go-ethereum/p2p/dnsdisc" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/utils" ma "github.com/multiformats/go-multiaddr" ) @@ -44,7 +42,7 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) } for _, node := range tree.Nodes() { - m, err := EnodeToMultiAddr(node) + m, err := utils.EnodeToMultiAddr(node) if err != nil { return nil, err } @@ -54,12 +52,3 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) return multiAddrs, nil } - -func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { - peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()}) - if err != nil { - return nil, err - } - - return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID)) -} diff --git a/waku/v2/discovery/enr_test.go b/waku/v2/dnsdisc/enr_test.go similarity index 88% rename from waku/v2/discovery/enr_test.go rename to waku/v2/dnsdisc/enr_test.go index 708300dd..28c69274 100644 --- a/waku/v2/discovery/enr_test.go +++ b/waku/v2/dnsdisc/enr_test.go @@ -1,10 +1,11 @@ -package discovery +package dnsdisc import ( "context" "testing" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" ) @@ -13,7 +14,7 @@ func TestEnodeToMultiAddr(t *testing.T) { parsedNode := enode.MustParse(enr) expectedMultiAddr := "/ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ" - actualMultiAddr, err := EnodeToMultiAddr(parsedNode) + actualMultiAddr, err := utils.EnodeToMultiAddr(parsedNode) require.NoError(t, err) require.Equal(t, expectedMultiAddr, actualMultiAddr.String()) } diff --git a/waku/v2/discovery/resolver.go b/waku/v2/dnsdisc/resolver.go similarity index 96% rename from waku/v2/discovery/resolver.go rename to waku/v2/dnsdisc/resolver.go index 9708f42e..8985b33b 100644 --- a/waku/v2/discovery/resolver.go +++ b/waku/v2/dnsdisc/resolver.go @@ -1,4 +1,4 @@ -package discovery +package dnsdisc import ( "context" diff --git a/waku/v2/discovery/resolver_test.go b/waku/v2/dnsdisc/resolver_test.go similarity index 96% rename from waku/v2/discovery/resolver_test.go rename to waku/v2/dnsdisc/resolver_test.go index b16a44ba..3d34b57f 100644 --- a/waku/v2/discovery/resolver_test.go +++ b/waku/v2/dnsdisc/resolver_test.go @@ -1,4 +1,4 @@ -package discovery +package dnsdisc import ( "context" diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 862363a4..2ecc3dea 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -3,10 +3,14 @@ package utils import ( "context" "errors" + "fmt" "math/rand" "sync" "time" + ma "github.com/multiformats/go-multiaddr" + + "github.com/ethereum/go-ethereum/p2p/enode" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -112,3 +116,21 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str return nil, ErrNoPeersAvailable } } + +func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { + peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()}) + if err != nil { + return nil, err + } + + return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID)) +} + +func EnodeToPeerInfo(node *enode.Node) (*peer.AddrInfo, error) { + address, err := EnodeToMultiAddr(node) + if err != nil { + return nil, err + } + + return peer.AddrInfoFromP2pAddr(address) +} diff --git a/waku/v2/discovery/public_key.go b/waku/v2/utils/public_key.go similarity index 99% rename from waku/v2/discovery/public_key.go rename to waku/v2/utils/public_key.go index d80770c8..937adc49 100644 --- a/waku/v2/discovery/public_key.go +++ b/waku/v2/utils/public_key.go @@ -1,4 +1,4 @@ -package discovery +package utils import ( "crypto/ecdsa" diff --git a/waku/v2/discovery/public_key_test.go b/waku/v2/utils/public_key_test.go similarity index 98% rename from waku/v2/discovery/public_key_test.go rename to waku/v2/utils/public_key_test.go index 6c77d823..165f88e8 100644 --- a/waku/v2/discovery/public_key_test.go +++ b/waku/v2/utils/public_key_test.go @@ -1,4 +1,4 @@ -package discovery +package utils import ( "crypto/ecdsa"